2017-12-19 3 views
0

행 단위 작업을 수행하는 함수를 사용하여 집계하려는 Pyspark DataFrame이 있습니다.PySpark DataFrame의 행 단위 집계

는 I는 4 열을 가지고 있고, 열 AI의 각 고유 값을 컬럼 B, C, D

I이 방법에 사용하고있는 행 단위 통합해야 :

  1. 이 사용하는 독특한 값을 가져

    A_uniques = df.select('A').distinct() 
    
  2. def func(x): 
        y = df.filter(df.A==x) 
        y = np.array(y.toPandas()) 
        for i in y.shape[0]: 
         y[i,1] = y[i-1,0] 
         y[i,0] = (y[i,0]+y[i,2])/y[i,3] 
        agg = sum(y[:,1]) 
        return agg 
    
  3. A_uniques.rdd.map(lambda x: (x['A'], func(x['A']))) 
    
나는이 오류가 무엇입니까

:

PicklingError: Could not serialize object: Py4JError: An error occurred while calling o64.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)

RDDs에 NumPy와 배열을 저장에 대한 해결책이 있습니까를? 아니면 다른 방식으로이 전체 작업을 수행 할 수 있습니까?

+0

샘플 입력과 출력을 게시하여 다른 접근 방식을 시도해 볼 수 있습니까? –

+1

당신이'groupby ('col')를 찾고 있다고 생각합니다. agg (sum (col2))' –

+0

당신이 가지고있는 문제는 당신이 rdd 변환에서 참조하고 rdd한다는 것입니다. 집계가 내장 된 pyspark 함수를 사용한다면 DataFrame'groupby (...). agg (...)'를 사용할 수 있습니다. 그렇지 않다면 rdd'groupby'와 맞춤 집계를 사용해야 할 수도 있습니다. – ags29

답변

0

Pyspark에서는 groupBy() (내 경우에는 2 cols로 그룹화) 함수를 사용하여 GroupedDataFrame을 가져오고 agg() 함수를 파이프합니다. 또한 GroupedDataFrame, in this example에 대한) (.apply 기능을 grouped_Series_Owner = x_gb["Owner"].apply(list)을 사용할 수 있습니다

sqlContext.sql("select * from retail_db.orders").groupBy("order_status", "order_date").agg({"order_customer_id": "sum", "order_id": "count"}).show() 

+---------------+--------------------+----------------------+---------------+ 
| order_status|   order_date|sum(order_customer_id)|count(order_id)| 
+---------------+--------------------+----------------------+---------------+ 
|PENDING_PAYMENT|2013-07-28 00:00:...|    237876|    37| 
|  COMPLETE|2013-08-22 00:00:...|    415843|    64| 
|PENDING_PAYMENT|2013-10-20 00:00:...|    168223|    28| 
|SUSPECTED_FRAUD|2013-11-22 00:00:...|     36354|    6| 
|PENDING_PAYMENT|2013-12-19 00:00:...|    131972|    22| 
|PENDING_PAYMENT|2014-03-12 00:00:...|    352832|    52| 
|  ON_HOLD|2014-03-28 00:00:...|     74970|    13| 
|SUSPECTED_FRAUD|2014-04-14 00:00:...|     18145|    2| 
|  PENDING|2014-04-21 00:00:...|    174419|    26| 
|   CLOSED|2014-06-04 00:00:...|     66677|    10| 
|PENDING_PAYMENT|2014-06-26 00:00:...|    249542|    45| 
|PENDING_PAYMENT|2013-08-17 00:00:...|    405980|    56| 
|   CLOSED|2013-09-10 00:00:...|    164670|    23| 
|SUSPECTED_FRAUD|2013-09-19 00:00:...|     26613|    4| 
|  PENDING|2013-09-26 00:00:...|    176547|    28| 
|  COMPLETE|2013-10-20 00:00:...|    314462|    54| 
|  CANCELED|2013-10-31 00:00:...|     36881|    6| 
|  PROCESSING|2013-11-09 00:00:...|    149164|    23| 
| PAYMENT_REVIEW|2013-11-29 00:00:...|     17368|    3| 
|SUSPECTED_FRAUD|2013-12-11 00:00:...|     45085|    7| 
+---------------+--------------------+----------------------+---------------+ 
only showing top 20 rows 

... 아래의 예를 참조하십시오 나는 목록에 집계 된 데이터를 변환하고 그들과 함께 일했다.

+0

집계 함수는 필자의 경우 훨씬 더 복잡하며 숫자가 적은 배열 연산이 필요합니다. –

+0

은 'DEF (x)는 FUNC : Y = df.filter (df.A는 == X) Y = np.array은 (는 y.toPandas())로 NumPy와 배열을 사용 #Aggregation 여기 agg' 창 일어난다 이 기능이 당신의 기능입니까? 더 복잡한 집계의 경우 RDD [aggregateByKey] (https://spark.apache.org/docs/2.2.1/api/python/pyspark.html?highlight=aggregatebykey#pyspark.RDD.aggregateByKey) 사용을 고려하십시오. – CarloV

+0

agg 사용자 정의 함수 [이 참조] (https://stackoverflow.com/questions/35989558/pyspark-custom-function-in-aggregation-on-grouped-data). – CarloV