행 단위 작업을 수행하는 함수를 사용하여 집계하려는 Pyspark DataFrame이 있습니다.PySpark DataFrame의 행 단위 집계
는 I는 4 열을 가지고 있고, 열 AI의 각 고유 값을 컬럼 B, C, D
I이 방법에 사용하고있는 행 단위 통합해야 :
- 을
이 사용하는 독특한 값을 가져
A_uniques = df.select('A').distinct()
def func(x): y = df.filter(df.A==x) y = np.array(y.toPandas()) for i in y.shape[0]: y[i,1] = y[i-1,0] y[i,0] = (y[i,0]+y[i,2])/y[i,3] agg = sum(y[:,1]) return agg
A_uniques.rdd.map(lambda x: (x['A'], func(x['A'])))
:
PicklingError: Could not serialize object: Py4JError: An error occurred while calling o64.getnewargs. Trace: py4j.Py4JException: Method getnewargs([]) does not exist at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318) at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326) at py4j.Gateway.invoke(Gateway.java:272) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:214) at java.lang.Thread.run(Thread.java:748)
RDDs에 NumPy와 배열을 저장에 대한 해결책이 있습니까를? 아니면 다른 방식으로이 전체 작업을 수행 할 수 있습니까?
샘플 입력과 출력을 게시하여 다른 접근 방식을 시도해 볼 수 있습니까? –
당신이'groupby ('col')를 찾고 있다고 생각합니다. agg (sum (col2))' –
당신이 가지고있는 문제는 당신이 rdd 변환에서 참조하고 rdd한다는 것입니다. 집계가 내장 된 pyspark 함수를 사용한다면 DataFrame'groupby (...). agg (...)'를 사용할 수 있습니다. 그렇지 않다면 rdd'groupby'와 맞춤 집계를 사용해야 할 수도 있습니다. – ags29