2017-12-01 1 views
0

pyspark 데이터 프레임의 행에 사용자 정의 함수를 적용하려고합니다. 이 함수는 동일한 차원의 행과 다른 두 벡터를 사용합니다. 두 번째 벡터의 행에서 일치하는 각 값에 대한 세 번째 벡터의 값 합계를 출력합니다.pyspark 데이터 프레임을 통한 사용자 정의 함수

import pandas as pd 
import numpy as np 

기능 :

def V_sum(row,b,c): 
    return float(np.sum(c[row==b])) 

내가 달성하고자하는 팬더와 간단는 :

pd_df = pd.DataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], columns=['t1', 't2', 't3', 't4']) 
    t1 t2 t3 t4 
0 0 1 0 0 
1 1 1 0 0 
2 0 0 1 0 
3 1 0 1 1 
4 1 1 0 0 

B = np.array([1,0,1,0]) 
V = np.array([5,1,2,4]) 

pd_df.apply(lambda x: V_sum(x, B, V), axis=1) 
0 4.0 
1 9.0 
2 7.0 
3 8.0 
4 9.0 
dtype: int64 

나는 pyspark에서 동일한 작업을 수행하고 싶습니다.

from pyspark import SparkConf, SparkContext, SQLContext 
sc = SparkContext("local") 
sqlContext = SQLContext(sc) 

spk_df = sqlContext.createDataFrame([[0,1,0,0],[1,1,0,0],[0,0,1,0],[1,0,1,1],[1,1,0,0]], ['t1', 't2', 't3', 't4']) 
spk_df.show() 
+---+---+---+---+ 
| t1| t2| t3| t4| 
+---+---+---+---+ 
| 0| 1| 0| 0| 
| 1| 1| 0| 0| 
| 0| 0| 1| 0| 
| 1| 0| 1| 1| 
| 1| 1| 0| 0| 
+---+---+---+---+ 

나는 UDF를 사용하여 생각하지만 난 그게 동작하지 않습니다 명확하게 얻을 수 있기 때문에 내가 뭔가를 잘못하고 있어요

from pyspark.sql.types import FloatType 
import pyspark.sql.functions as F 

V_sum_udf = F.udf(V_sum, FloatType()) 
spk_df.select(V_sum_udf(F.array(*(F.col(x) for x in spk_df.columns))).alias("results")).show() 

:

Py4JJavaError: An error occurred while calling o27726.showString. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 90.0 failed 1 times, most recent failure: Lost task 0.0 in stage 90.0 (TID 91, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last): 

답변

0

당신이 한 경우 새로운 컬럼을 계산하기 위해 컬럼 데이터와 함 2 함수 내에서 사용하려는 비 컬럼 데이터가있는 경우, here에 설명 된 UDF + 폐쇄 및 withColumn을 시작하는 것이 좋습니다.

B = [2,0,1,0] 
V = [5,1,2,4] 

v_sum_udf = F.udf(lambda row: V_sum(row, B, V), FloatType()) 
spk_df.withColumn("results", v_sum_udf(F.array(*(F.col(x) for x in spk_df.columns)))) 
+0

답장을 보내 주셔서 감사합니다. 귀하의 코드를 실행 해봤는데, show()로 결과를 표시하려고 시도 할 때까지 아무런 오류도 던지지 않습니다. – Haboryme

+1

spk_df.withColumn ("results", v_sum_udf (spk_df.columns)의 x에 대해 F.array (* (F.col (x))))) 올바른 결과를 제공합니다. 나를 올바른 방향으로 가리켜 주셔서 감사합니다. – Haboryme

관련 문제