2016-10-23 3 views
0

나는 파이썬에서 스파크를 배우려고 노력 중이며 키 - 값 쌍의 값을 평균화하기 위해 combineByKey과 붙어있다. 실제로, 나의 혼란은 combineByKey 문법과 다르다. O'Rielly 2015 Learning Spark Book의 전형적인 예는 여러 곳의 웹에서 볼 수 있습니다. here's one.파이썬 스파크 combineByKey 평균

문제는 sumCount.map(lambda (key, (totalSum, count)): (key, totalSum/count)).collectAsMap() 문과 함께 있습니다. spark 2.0.1 및 iPython 3.5.2를 사용하면 구문 오류 예외가 발생합니다. 작동시킬 수있는 것으로 간단하게 (그리고 O'Reilly 책에있는 것) : sumCount.map(lambda key,vals: (key, vals[0]/vals[1])).collectAsMap()은 Spark이 자바 예외로 박쥐 ** t에 미치도록 유도하지만, 나는 TypeError: <lambda>() missing 1 required positional argument: 'v' 오류를 기록합니다.

Spark & Python의 최신 버전에서 실제로 작동하는이 기능의 예제를 알려줄 수 있습니까? 그것은 쉽게

In: pRDD = sc.parallelize([("s",5),("g",3),("g",10),("c",2),("s",10),("s",3),("g",-1),("c",20),("c",2)]) 
In: cbk = pRDD.combineByKey(lambda x:(x,1), lambda x,y:(x[0]+y,x[1]+1),lambda x,y:(x[0]+y[0],x[1]+y[1])) 
In: cbk.collect() 
Out: [('s', (18, 3)), ('g', (12, 3)), ('c', (24, 3))] 
In: cbk.map(lambda key,val:(k,val[0]/val[1])).collectAsMap() <-- errors 

충분히 [(e[0],e[1][0]/e[1][1]) for e in cbk.collect()]을 계산하기 위해,하지만 난 오히려 "Sparkic"방식의 작업을 얻을 것 : 완전성 예를 들어, 나는 내 자신의 최소한의 작업을 포함했다 (또는 오히려, 비 작동). 단계별

답변

2

단계 :

  • lambda (key, (totalSum, count)): ... 파이썬 제거한 Tuple Parameter Unpacking 소위된다.
  • RDD.map은 단일 인수로 예상되는 함수를 사용합니다. 사용하려고하는 함수 :

    두 인수를 필요로하는 함수가 아닌 하나의 인수입니다. 마지막으로 수치 적으로 안정적인 솔루션을 것

    cbk.mapValues(lambda x: x[0]/x[1]) 
    
  • :

    def get_mean(key_vals): 
        key, (total, cnt) = key_vals 
        return key, total/cnt 
    
    cbk.map(get_mean) 
    

    또한 mapValues 훨씬 간단하게이 작업을 할 수 있습니다 : 2.X 구문에서 올바른 번역은

    lambda key_vals: (key_vals[0], key_vals[1][0]/key_vals[1][1]) 
    

    또는 것 be :

    from pyspark.statcounter import StatCounter 
    
    (pRDD 
        .combineByKey(
         lambda x: StatCounter([x]), 
         StatCounter.merge, 
         StatCounter.mergeStats) 
        .mapValues(StatCounter.mean)) 
    
-1

특정 열 값에 대한 평균은 개념을 사용하여 수행 할 수 있습니다. 다음 코드를 고려하십시오

import pyspark.sql.functions as F 
from pyspark.sql import Window 
df = spark.createDataFrame([('a', 2), ('b', 3), ('a', 6), ('b', 5)], 
          ['a', 'i']) 
win = Window.partitionBy('a') 
df.withColumn('avg', F.avg('i').over(win)).show() 

가 얻을 것이다 : 평균 집계 별도로 각 작업자 수행

+---+---+---+ 
| a| i|avg| 
+---+---+---+ 
| b| 3|4.0| 
| b| 5|4.0| 
| a| 2|4.0| 
| a| 6|4.0| 
+---+---+---+ 

, 아니 라운드 호스트에 여행, 따라서 효율적인 필요하지 않습니다.

+0

감사합니다.하지만이 질문에 이미 게시 된 모든 내용이 명확하므로 'combineByKey' ->'map' 연산이 오류를 일으키는 이유에 대해 구체적으로 질문했습니다. –