2016-11-23 1 views
0

동일한 분할자를 사용하여 이미 분할되고 HDFS에 저장된 두 개의 데이터 세트가 있습니다. 이 데이터 세트는 우리가 제어 할 수없는 두 가지 다른 스파크 작업의 출력입니다. 이제이 두 데이터 세트를 결합하여 다른 정보를 생성 해 보겠습니다.Spark에서 HDFS의 두 데이터 파일을 결합 하시겠습니까?

Example: 

Data Set 1: 
ORDER_ID CUSTOMER_ID ITEMS 
OD1  C1   1,2,3 -> partition 0 
OD2  C2   3,4,5 -> partition 0 
OD3  C4   1,2,3 -> partition 1 
OD4  C3   1,3  -> partition 1 

Data Set 1: 
ORDER_ID CUSTOMER_ID REFUND_ITEMS 
OD1  C1   1  -> partition 0 
OD2  C2   5  -> partition 0 
OD3  C4   2,3 -> partition 1 
OD4  C3   3  -> partition 1 

Options are: 

1) Create two RDDs from the datasets and join them. 
2) Create one RDD using one of the dataset. 
    -> For each partition in the RDD get the actual partition id i.e OD1 -> 0, OD3 -> 1 (using some custom logic) 
    -> Load data from HDFS for that partition for dataset 2 
    -> Iterate over both the dataset and produce combined result. 

For option 2 I don't know how to read a specific file form HDFS in the Spark executor. (I have the full URI for location of the file) 

답변

0

2 개의 데이터 프레임을 생성하고 SQL을 사용하여 결합 할 수 있습니다. 아래 코드를 찾으십시오.

import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder 
import org.apache.spark.sql.Encoder 

// For implicit conversions from RDDs to DataFrames 
import spark.implicits._ 

case class struc_dataset(ORDER_ID: String,CUSTOMER_ID: String, ITEMS:String) 

//Read file1 
val File1DF = spark.sparkContext 
    .textFile("temp/src/file1.txt") 
    .map(_.split("\t")) 
    .map(attributes => struc_dataset(attributes(0), attributes(1),attributes(3))).toDF() 

//Register as Temp view - Dataset1 
File1DF.createOrReplaceTempView("Datset1") 

//Read file2 
val File2DF = spark.sparkContext 
    .textFile("temp/src/file2.txt") 
    .map(_.split("\t")) 
    .map(attributes => struc_dataset(attributes(0),attributes(1),attributes(3))).toDF() 

//Register as Temp view - Dataset2 
File2DF.createOrReplaceTempView("Datset2") 

// SQL statement to create final dataframe (JOIN) 
val finalDF = spark.sql("SELECT * FROM Dataset1 ds1 JOIN Dataset2 ds2 on ds1.ORDER_ID=ds2.ORDER_ID AND ds1.CUSTOMER_ID=ds2.CUSTOMER_ID") 

finalDF.show() 
관련 문제