2014-06-05 3 views
20

나는 (사용자 ID, 이름, 개수) 타입의 튜플 목록을 가지고있다. 예를 들어아파치 스파크 (스칼라)에서 reduceByKey 사용

, 나는 각 요소 이름이 계산되는 유형이 컬렉션을 줄이기 위해 시도하고있어

val x = sc.parallelize(List(
    ("a", "b", 1), 
    ("a", "b", 1), 
    ("c", "b", 1), 
    ("a", "d", 1)) 
) 

.

그래서 위의 발 X에있는 변환됩니다 :

val byKey = x.map({case (id,uri,count) => (id,uri)->count}) 

val grouped = byKey.groupByKey 
val count = grouped.map{case ((id,uri),count) => ((id),(uri,count.sum))} 
val grouped2: org.apache.spark.rdd.RDD[(String, Seq[(String, Int)])] = count.groupByKey 

grouped2.foreach(println) 

나는 그것이 GroupByKey에서보다 더 빠르게 수행으로 reduceByKey를 사용하려고 해요 : 여기

(a,ArrayBuffer((d,1), (b,2))) 
(c,ArrayBuffer((b,1))) 

내가 현재 사용하고있는 코드입니다.

에 동일한 매핑을 제공하려면 위 코드 대신 reduceByKey를 어떻게 구현할 수 있습니까?

val byKey = x.map({case (id,uri,count) => (id,uri)->count}) 

당신은 할 수 : 코드에 따라

답변

26

val reducedByKey = byKey.reduceByKey(_ + _) 

scala> reducedByKey.collect.foreach(println) 
((a,d),1) 
((a,b),2) 
((c,b),1) 

PairRDDFunctions[K,V].reduceByKey는 RDD [의 V 입력 할에 적용 할 수있는 기능을 감소 연관됩니다 (K를, V) ]. 즉, 함수 f[V](e1:V, e2:V) : V이 필요합니다. 이 특별한 경우에는 Ints에 대한 합계가 있습니다 : (x:Int, y:Int) => x+y 또는 _ + _ 짧은 밑줄 표기법.

레코드의 경우 reduceByKey은 셔플/감소 단계 이전에 reduce 함수를 로컬에서 적용하려고하기 때문에 groupByKey보다 우수한 성능을 보입니다. groupByKey은 그룹화하기 전에 모든 요소를 ​​강제로 섞습니다.

+1

기본적으로 reduceByKey는 groupBy를 수행 한 다음 사용자 정의 reduce 함수를 적용한 것과 같은 결과를 얻습니까? – Savvas

+5

@Savvas 최종 결과는 동일하지만'reduceByKey'는 실행 프로그램 당 O (1) 메모리 요구 사항을 가지지 만'groupByKey'는 OOM으로 이어질 수있는 모든 그룹화 된 값을 메모리에 유지해야합니다. – maasg

5

원본 데이터 구조는 RDD [(String, String, Int)]이고 reduceByKey은 데이터 구조가 RDD [(K, V)] 인 경우에만 사용할 수 있습니다.

val kv = x.map(e => e._1 -> e._2 -> e._3) // kv is RDD[((String, String), Int)] 
val reduced = kv.reduceByKey(_ + _)  // reduced is RDD[((String, String), Int)] 
val kv2 = reduced.map(e => e._1._1 -> (e._1._2 -> e._2)) // kv2 is RDD[(String, (String, Int))] 
val grouped = kv2.groupByKey()   // grouped is RDD[(String, Iterable[(String, Int)])] 
grouped.foreach(println) 
+0

'V'가 숫자 여야한다는 제한은 없습니다. 유일한 요구 사항은 함수 f (V, V) => V는 연관 적이어야한다는 것입니다. 일치하지 않으면 결과가 일치하지 않습니다. – maasg

+0

그건 실수예요. 나는 그 사람에게 (_ + _) 생각하고 있었다 : P, 업데이트. – cloud

0

구문은 아래와 같다 :

은 (명확 같은 종류의 것)의 값을 취하는 RDD 동일한 키를 말한다
reduceByKey(func: Function2[V, V, V]): JavaPairRDD[K, V], 

기능의 일부로서 제공하는 동작을 수행 상위 RDD와 동일한 유형의 값을 반환합니다.

관련 문제