6

출력 순서는 주어진 입력

주문이 세트에서 변경되는 isin() 함수 후 아파치 스파크 상이한 열 이름을 갖는 두 개의 세트를 병합 문제 문 위해서는 아니다.

심지어 나는 sort, orderby으로 시도했지만 작동하지 않았습니다.

입력 데이터 1

RowFactory.create("405-048011-62815", "CRC Industries"), 
RowFactory.create("630-0746","Dixon value"), 
RowFactory.create("4444-444","3M INdustries"), 
RowFactory.create("555-55","Dixon coupling valve") 

입력 DATA2 :

RowFactory.create("222-2222-5555", "Tata"), 
RowFactory.create("7777-88886","WestSide"), 
RowFactory.create("22222-22224","Reliance"), 
RowFactory.create("33333-3333","V industries") 


List<Row> data = Arrays.asList(
RowFactory.create("405-048011-62815", "CRC Industries"), 
RowFactory.create("630-0746","Dixon value"), 
RowFactory.create("4444-444","3M INdustries"), 
RowFactory.create("555-55","Dixon coupling valve")); 

StructType schema = new StructType(new StructField[] { 
new StructField("label1", DataTypes.StringType, false,Metadata.empty()), 
new StructField("sentence1", DataTypes.StringType, false,Metadata.empty()) }); 

Dataset<Row> sentenceDataFrame = spark.createDataFrame(data, schema); 

List<String> listStrings = new ArrayList<String>(); 
listStrings.add("405-048011-62815"); 
listStrings.add("630-0746"); 
listStrings.add("4444-444"); 
listStrings.add("555-55"); 

Dataset<Row> matchFound1 = sentenceDataFrame.filter(col("label1").isin(listStrings.stream().toArray(String[]::new))); 
matchFound1.show(); 


listStrings.clear(); 
listStrings.add("222-2222-5555"); 
listStrings.add("7777-88886"); 
listStrings.add("22222-22224"); 
listStrings.add("33333-3333"); 
StringIndexer indexer = new StringIndexer() 
    .setInputCol("label1") 
    .setOutputCol("label1Index1"); 
Dataset<Row> Dataset1 = indexer.fit(matchFound1).transform(matchFound1); 
Dataset1.show(); 


List<Row> data2 = Arrays.asList(
    RowFactory.create("222-2222-5555", "Tata"), 
    RowFactory.create("7777-88886","WestSide"), 
    RowFactory.create("22222-22224","Reliance"), 
    RowFactory.create("33333-3333","V industries")); 
StructType schema2 = new StructType(new StructField[] { 
new StructField("label2", DataTypes.StringType, false,Metadata.empty()), 
new StructField("sentence2", DataTypes.StringType, false,Metadata.empty()) }); 

Dataset<Row> sentenceDataFrame2 = spark.createDataFrame(data2, schema2); 

Dataset<Row> matchFound2 = sentenceDataFrame2.filter(col("label2").isin(listStrings.stream().toArray(String[]::new))); 
matchFound2.show(); 

StringIndexer indexer1 = new StringIndexer() 
    .setInputCol("label2") 
    .setOutputCol("label2Index1"); 
Dataset<Row> Dataset2 = indexer1.fit(matchFound2).transform(matchFound2); 
Dataset2.show(); 

Dataset<Row> Finalresult = Dataset1.join(Dataset2 , Dataset1.col("label1Index1").equalTo(Dataset2.col("label2Index1"))).drop(Dataset1.col("label1Index1")).drop(Dataset2.col("label2Index1")); 
Finalresult.show(); 

실제 출력 :

+----------------+--------------------+-------------+------------+ 
    |   label1|   sentence1|  label2| sentence2| 
    +----------------+--------------------+-------------+------------+ 
    |405-048011-62815|  CRC Industries| 33333-3333|V industries| 
    |  630-0746|   Dixon value|222-2222-5555|  Tata| 
    |  4444-444|  3M INdustries| 7777-88886| WestSide| 
    |   555-55|Dixon coupling valve| 22222-22224| Reliance| 
    +----------------+--------------------+-------------+------------+ 

예상 출력 :

+----------------+--------------------+-------------+------------+ 
    |   label1|   sentence1|  label2| sentence2| 
    +----------------+--------------------+-------------+------------+ 
    |405-048011-62815|  CRC Industries|222-2222-5555|V industries| 
    |  630-0746|   Dixon value| 7777-88886 |  Tata| 
    |  4444-444|  3M INdustries| 22222-22224| WestSide| 
    |   555-55|Dixon coupling valve| 33333-3333 | Reliance| 
    +----------------+--------------------+-------------+------------+ 

답변

7

오히려 문자열 인덱서를하는 것보다 아래로 DataFrame을 monotonically_increasing_id()를 사용하여 고유의 일련 번호와의 지속적인 열을 추가하고 다시 만들 수 있습니다 :

Dataset<Row> Finalresult = Test1.join(Test2 , Test1.col("rowId1").equalTo(Test2.col("rowId2"))); 
:

Dataset<Row> Test2=Dataset2.withColumn("rowId2", monotonically_increasing_id()) ; 
Dataset<Row> Test1=Dataset1.withColumn("rowId1", monotonically_increasing_id()) ; 

그런 다음 두 데이터 집합에 가입