다른 데이터 프레임에서 UDF를 실행할 때 pyspark 데이터 프레임을 어떻게 참조합니까?Pyspark : 다른 데이터 프레임의 UDF에서 데이터 프레임을 참조하는 방법은 무엇입니까?
여기에 가짜 예제가 있습니다. scores
과 lastnames
이라는 두 개의 데이터 프레임을 만들고 두 개의 데이터 프레임에서 동일한 열이 있습니다. scores
에 적용된 UDF에서 lastnames
을 필터링하고 lastname
에있는 문자열을 반환하고 싶습니다.
from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SQLContext
from pyspark.sql.types import *
sc = SparkContext("local")
sqlCtx = SQLContext(sc)
# Generate Random Data
import itertools
import random
student_ids = ['student1', 'student2', 'student3']
subjects = ['Math', 'Biology', 'Chemistry', 'Physics']
random.seed(1)
data = []
for (student_id, subject) in itertools.product(student_ids, subjects):
data.append((student_id, subject, random.randint(0, 100)))
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
schema = StructType([
StructField("student_id", StringType(), nullable=False),
StructField("subject", StringType(), nullable=False),
StructField("score", IntegerType(), nullable=False)
])
# Create DataFrame
rdd = sc.parallelize(data)
scores = sqlCtx.createDataFrame(rdd, schema)
# create another dataframe
last_name = ["Granger", "Weasley", "Potter"]
data2 = []
for i in range(len(student_ids)):
data2.append((student_ids[i], last_name[i]))
schema = StructType([
StructField("student_id", StringType(), nullable=False),
StructField("last_name", StringType(), nullable=False)
])
rdd = sc.parallelize(data2)
lastnames = sqlCtx.createDataFrame(rdd, schema)
scores.show()
lastnames.show()
from pyspark.sql.functions import udf
def getLastName(sid):
tmp_df = lastnames.filter(lastnames.student_id == sid)
return tmp_df.last_name
getLastName_udf = udf(getLastName, StringType())
scores.withColumn("last_name", getLastName_udf("student_id")).show(10)
그리고 다음은 추적의 마지막 부분입니다 대신 rdd
를 만들고 df
에 그것을 만들기의 이름
data2 = {}
for i in range(len(student_ids)):
data2[student_ids[i]] = last_name[i]
의 쉬운 검색을 위해 사전에 쌍을 변경
Py4JError: An error occurred while calling o114.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344)
at py4j.Gateway.invoke(Gateway.java:252)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:209)
at java.lang.Thread.run(Thread.java:745)
UDF에서'df'에 접근 할 수 없습니다. 왜냐하면 executor에서 처리 될 것이고'df' ref는 드라이버에서만 접근 할 수 있기 때문입니다. 'lastnames'에 브로드 캐스트 변수를 사용할 수 있습니다. 도움이 필요하면 알려주세요. – mrsrinivas
그러나 UDF에서하기보다는'lastnames'를'score'로 합치는 것을 고려하십시오. – mrsrinivas
안녕하세요 @ mrsrinivas, 답장을 보내 주셔서 감사합니다. 첫째, 조인을 사용할 수 없습니다. 비록이 더미 예제가 조인을 사용하여 해결할 수 있지만 실제 구현에서는 UDF 내에서 더 많은 처리를해야하기 때문입니다. 둘째로, 그렇습니다! 이 경우 브로드 캐스트 변수를 어떻게 사용할 수 있습니까? – tohweizhong