2016-06-09 7 views
1

스파크 데이터 프레임에 대한 사용자 지정 집계 함수를 지정하는 방법이 있는지 궁금합니다. 내가 2 열 idvalue있는 테이블이 있다면 나는과 같이 각 value에 대한리스트로 값을 id을 GROUPBY 및 집계 싶습니다 :스파크 데이터 프레임에 대한 사용자 지정 집계

에서 :

john | tomato 
john | carrot 
bill | apple 
john | banana 
bill | taco 

에 :

john | tomato, carrot, banana 
bill | apple, taco 

데이터 프레임에서 이것이 가능합니까? 나는 오크 파일로 데이터를 읽고 데이터 프레임으로로드되기 때문에 데이터 프레임에 대해 묻습니다. 나는 그것을 RDD로 변환하는 것이 비효율적이라고 생각한다.

답변

6

난 그냥 다음과 같이 간단하게 갈 것 :

import org.apache.spark.sql.functions.collect_list 
val df = Seq(("john", "tomato"), ("john", "carrot"), 
      ("bill", "apple"), ("john", "banana"), 
      ("bill", "taco")).toDF("id", "value") 
// df: org.apache.spark.sql.DataFrame = [id: string, value: string] 

val aggDf = df.groupBy($"id").agg(collect_list($"value").as("values")) 
// aggDf: org.apache.spark.sql.DataFrame = [id: string, values: array<string>] 

aggDf.show(false) 
// +----+------------------------+ 
// |id |values     | 
// +----+------------------------+ 
// |john|[tomato, carrot, banana]| 
// |bill|[apple, taco]   | 
// +----+------------------------+ 

당신은 심지어 기본 rdd를 호출 할 필요가 없습니다.

1

되돌리기 RDD에 대한 작업은이 같은 문제에 대해 가장 잘 작동하는 경향이있다 :

효율로
scala> val df = sc.parallelize(Seq(("john", "tomato"), 
      ("john", "carrot"), ("bill", "apple"), 
      ("john", "bannana"), ("bill", "taco"))) 
      .toDF("name", "food") 
df: org.apache.spark.sql.DataFrame = [name: string, food: string] 

scala> df.show 
+----+-------+ 
|name| food| 
+----+-------+ 
|john| tomato| 
|john| carrot| 
|bill| apple| 
|john|bannana| 
|bill| taco| 
+----+-------+ 

scala> val aggregated = df.rdd 
      .map{ case Row(k: String, v: String) => (k, List(v)) } 
      .reduceByKey{_ ++ _} 
      .toDF("name", "foods") 
aggregated: org.apache.spark.sql.DataFrame = [name: string, foods: array<string>] 

scala> aggregated.collect.foreach{println} 
[john,WrappedArray(tomato, carrot, bannana)] 
[bill,WrappedArray(apple, taco)] 

, 나는 DataFrames는 후드 아래 RDD의 믿는다 그래서 .rdd 같은 변환이 매우 적은 비용을 가지고있다.

관련 문제