2017-02-17 1 views
1

스파크에서는 여러 데이터 프레임을 병렬 처리 할 수 ​​있기를 원합니다.Spark Dataframe을 다른 Dataframe에 중첩시킬 수 있습니까?

내가 시도하는 방법은 상위 데이터 프레임에 데이터 프레임을 중첩하는 것이지만 구문이 확실하지 않거나 가능한지 확실하지 않습니다.

예를 들어 나는 다음과 같은 2 dataframes 있습니다 DF1을 :

+-----------+---------+--------------------+------+ 
|id   |asset_id |    date| text| 
+-----------+---------+--------------------+------+ 
|20160629025|  A1|2016-06-30 11:41:...|aaa...| 
|20160423007|  A1|2016-04-23 19:40:...|bbb...| 
|20160312012|  A2|2016-03-12 19:41:...|ccc...| 
|20160617006|  A2|2016-06-17 10:36:...|ddd...| 
|20160624001|  A2|2016-06-24 04:39:...|eee...| 

DF2 :

+--------+--------------------+--------------+ 
|asset_id|  best_date_time| Other_fields| 
+--------+--------------------+--------------+ 
|  A1|2016-09-28 11:33:...|   abc| 
|  A1|2016-06-24 00:00:...|   edf| 
|  A1|2016-08-12 00:00:...|   hij| 
|  A2|2016-07-01 00:00:...|   klm| 
|  A2|2016-07-10 00:00:...|   nop| 

그래서 나는 이런 식으로 뭔가를 생산하기 위해 이들을 결합하려는. 이들은 매우 희박한 것으로

+--------+--------------------+-------------------+ 
|asset_id|     df1|    df2| 
+--------+--------------------+-------------------+ 
|  A1| [df1 - rows for A1]|[df2 - rows for A1]| 
|  A2| [df1 - rows for A2]|[df2 - rows for A2]| 

주, 나는 (I 실제로 수천 개의 행으로 약 30 dataframes 자산의 수천 각을)를 가입하거나 노조 싶지 않아요.

[('A1', <pyspark.resultiterable.ResultIterable object at 0x2534310>), ('A2', <pyspark.resultiterable.ResultIterable object at 0x25d2310>)] 

내가 그렇게 크게 감사합니다 도움이 촉발하는 새로운 해요 :

은 그때 내가에 함수를 호출 할 수 있습니다 이런 식으로 뭔가를 얻을 수 있도록이에 GroupByKey에서 작업을 수행 할 계획이다.

답변

2

TL : DRDataFrames은 중첩 할 수 없지만 복잡한 유형을 사용할 수 있습니다. 이 경우

당신은 (예를 들어 2.0 이상 또는 스파크) 수 :

from pyspark.sql.functions import collect_list, struct 

df1_grouped = (df1 
    .groupBy("asset_id") 
    .agg(collect_list(struct("id", "date", "text")))) 

df2_grouped = (df2 
    .groupBy("asset_id") 
    .agg(collect_list(struct("best_date_time", "Other_fields")))) 

df1_grouped.join(df2_grouped, ["asset_id"], "fullouter") 

하지만 당신은 알고 있어야합니다 :

  • 그것은 매우 비싸다.
  • 응용 분야가 제한되어 있습니다. 일반적으로 중첩 구조는 사용하기가 번거롭고 복잡하고 비싼 (특히 PySpark에서) UDF가 필요합니다.
+0

유용한 포인터를 제공해 주셔서 감사합니다. – prk

관련 문제