0
pyspark 데이터 프레임의 행에 사용자 정의 함수를 적용하려고합니다. 이 함수는 동일한 차원의 행과 다른 두 벡터를 사용합니다. 두 번째 벡터의 행에서 일치하는 각 값에 대한 세 번째 벡터의 값 합계를 출력합니다.pyspark 데이터 프레임을 통한 사용자 정의 함수
import pandas as pd
import numpy as np
기능 :
def V_sum(row,b,c):
return float(np.sum(c[row==b]))
내가 달성하고자하는 팬더와 간단는 :
pd_df = pd.DataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], columns=['t1', 't2', 't3', 't4'])
t1 t2 t3 t4
0 0 1 0 0
1 1 1 0 0
2 0 0 1 0
3 1 0 1 1
4 1 1 0 0
B = np.array([1,0,1,0])
V = np.array([5,1,2,4])
pd_df.apply(lambda x: V_sum(x, B, V), axis=1)
0 4.0
1 9.0
2 7.0
3 8.0
4 9.0
dtype: int64
나는 pyspark에서 동일한 작업을 수행하고 싶습니다.
from pyspark import SparkConf, SparkContext, SQLContext
sc = SparkContext("local")
sqlContext = SQLContext(sc)
spk_df = sqlContext.createDataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], ['t1', 't2', 't3', 't4'])
spk_df.show()
+---+---+---+---+
| t1| t2| t3| t4|
+---+---+---+---+
| 0| 1| 0| 0|
| 1| 1| 0| 0|
| 0| 0| 1| 0|
| 1| 0| 1| 1|
| 1| 1| 0| 0|
+---+---+---+---+
나는 UDF를 사용하여 생각하지만 난 그게 동작하지 않습니다 명확하게 얻을 수 있기 때문에 내가 뭔가를 잘못하고 있어요
from pyspark.sql.types import FloatType
import pyspark.sql.functions as F
V_sum_udf = F.udf(V_sum, FloatType())
spk_df.select(V_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))).alias("results")).show()
:
Py4JJavaError: An error occurred while calling o27726.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 90.0 failed 1 times, most recent failure: Lost task 0.0 in stage 90.0 (TID 91, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
답장을 보내 주셔서 감사합니다. 귀하의 코드를 실행 해봤는데, show()로 결과를 표시하려고 시도 할 때까지 아무런 오류도 던지지 않습니다. – Haboryme
spk_df.withColumn ("results", v_sum_udf (spk_df.columns)의 x에 대해 F.array (* (F.col (x))))) 올바른 결과를 제공합니다. 나를 올바른 방향으로 가리켜 주셔서 감사합니다. – Haboryme