2016-08-20 3 views
0

제 질문은 PySpark reduceByKey on multiple values과 비슷하지만 다소 차이가 있습니다. 필자는 PySpark를 처음 사용하므로 확실한 내용이 빠져 있습니다.중첩 된 튜플의 Pyspark reduceByKey

(K0, ((k01,v01), (k02,v02), ...)) 
.... 
(Kn, ((kn1,vn1), (kn2,vn2), ...)) 

내가 출력으로 원하는 것은이 reduceByKey를 사용하는 완벽한 경우처럼 보인다

(K0, v01+v02+...) 
... 
(Kn, vn1+vn2+...) 

추천하고 내가 생각했던 처음에 뭔가가 :

나는 다음과 같은 구조를 가진 RDD이 무엇이든 같은 것

rdd.reduceByKey(lambda x,y: x[1]+y[1]) 

내가 시작했던 RDD가 정확히 무엇인지 알 수있다. 와. 중첩 된 튜플이 있기 때문에 내 인덱싱에 문제가 있다고 생각합니다.하지만 생각할 수있는 모든 가능한 인덱스 조합을 시도했으며 초기 RDD를 다시 돌려줍니다.

중첩 된 튜플과 함께 작동해서는 안되는 이유가 있을까요? 아니면 잘못된 것을하고 있습니까?

답변

0

여기에 reduceByKey을 사용하면 안됩니다. 그것은 서명과 연관되고 교환 가능한 함수를 필요로합니다. (T, T) => T. List[Tuple[U, T]]을 입력으로 사용하고 T을 출력으로 사용하는 경우에는 적용 할 수 없습니다.

키 또는 고유한지 여부로 인해 로컬 및 전역으로 집계해야하는 일반적인 경우를 고려할 수 있는지 명확하지 않습니다. v01, v02이 ... vm 간단한 수치 있다고 가정하자 :

from functools import reduce 
from operator import add 

def agg_(xs): 
    # For numeric values sum would be more idiomatic 
    # but lets make it more generic 
    return reduce(add, (x[1] for x in xs), zero_value) 

zero_value = 0 
merge_op = add 
def seq_op(acc, xs): 
    return acc + agg_(xs) 

rdd = sc.parallelize([ 
    ("K0", (("k01", 3), ("k02", 2))), 
    ("K0", (("k03", 5), ("k04", 6))), 
    ("K1", (("k11", 0), ("k12", -1)))]) 

rdd.aggregateByKey(0, seq_op, merge_op).take(2) 
## [('K0', 16), ('K1', -1)] 

키가 이미 간단한 mapValues가 충분 고유 경우

나에게 지금 분명하다
from itertools import chain 

unique_keys = rdd.groupByKey().mapValues(lambda x: tuple(chain(*x))) 
unique_keys.mapValues(agg_).take(2) 
## [('K0', 16), ('K1', -1)] 
+0

. 예, 키는 고유하므로 mapValues ​​방식은 필요한 것입니다. 고맙습니다. –

관련 문제