2

다른 데이터 프레임에서 UDF를 실행할 때 pyspark 데이터 프레임을 어떻게 참조합니까?Pyspark : 다른 데이터 프레임의 UDF에서 데이터 프레임을 참조하는 방법은 무엇입니까?

여기에 가짜 예제가 있습니다. scoreslastnames이라는 두 개의 데이터 프레임을 만들고 두 개의 데이터 프레임에서 동일한 열이 있습니다. scores에 적용된 UDF에서 lastnames을 필터링하고 lastname에있는 문자열을 반환하고 싶습니다.

from pyspark import SparkContext 
from pyspark import SparkConf 
from pyspark.sql import SQLContext 
from pyspark.sql.types import * 

sc = SparkContext("local") 
sqlCtx = SQLContext(sc) 


# Generate Random Data 
import itertools 
import random 
student_ids = ['student1', 'student2', 'student3'] 
subjects = ['Math', 'Biology', 'Chemistry', 'Physics'] 
random.seed(1) 
data = [] 

for (student_id, subject) in itertools.product(student_ids, subjects): 
    data.append((student_id, subject, random.randint(0, 100))) 

from pyspark.sql.types import StructType, StructField, IntegerType, StringType 
schema = StructType([ 
      StructField("student_id", StringType(), nullable=False), 
      StructField("subject", StringType(), nullable=False), 
      StructField("score", IntegerType(), nullable=False) 
    ]) 

# Create DataFrame 
rdd = sc.parallelize(data) 
scores = sqlCtx.createDataFrame(rdd, schema) 

# create another dataframe 
last_name = ["Granger", "Weasley", "Potter"] 
data2 = [] 
for i in range(len(student_ids)): 
    data2.append((student_ids[i], last_name[i])) 

schema = StructType([ 
      StructField("student_id", StringType(), nullable=False), 
      StructField("last_name", StringType(), nullable=False) 
    ]) 

rdd = sc.parallelize(data2) 
lastnames = sqlCtx.createDataFrame(rdd, schema) 


scores.show() 
lastnames.show() 


from pyspark.sql.functions import udf 
def getLastName(sid): 
    tmp_df = lastnames.filter(lastnames.student_id == sid) 
    return tmp_df.last_name 

getLastName_udf = udf(getLastName, StringType()) 
scores.withColumn("last_name", getLastName_udf("student_id")).show(10) 

그리고 다음은 추적의 마지막 부분입니다 대신 rdd를 만들고 df에 그것을 만들기의 이름

data2 = {} 
for i in range(len(student_ids)): 
    data2[student_ids[i]] = last_name[i] 

의 쉬운 검색을 위해 사전에 쌍을 변경

Py4JError: An error occurred while calling o114.__getnewargs__. Trace: 
py4j.Py4JException: Method __getnewargs__([]) does not exist 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:335) 
    at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:344) 
    at py4j.Gateway.invoke(Gateway.java:252) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:209) 
    at java.lang.Thread.run(Thread.java:745) 
+0

UDF에서'df'에 접근 할 수 없습니다. 왜냐하면 executor에서 처리 될 것이고'df' ref는 드라이버에서만 접근 할 수 있기 때문입니다. 'lastnames'에 브로드 캐스트 변수를 사용할 수 있습니다. 도움이 필요하면 알려주세요. – mrsrinivas

+0

그러나 UDF에서하기보다는'lastnames'를'score'로 합치는 것을 고려하십시오. – mrsrinivas

+0

안녕하세요 @ mrsrinivas, 답장을 보내 주셔서 감사합니다. 첫째, 조인을 사용할 수 없습니다. 비록이 더미 예제가 조인을 사용하여 해결할 수 있지만 실제 구현에서는 UDF 내에서 더 많은 처리를해야하기 때문입니다. 둘째로, 그렇습니다! 이 경우 브로드 캐스트 변수를 어떻게 사용할 수 있습니까? – tohweizhong

답변

2

브로드 캐스트 변수 생성

//rdd = sc.parallelize(data2) 
//lastnames = sqlCtx.createDataFrame(rdd, schema) 
lastnames = sc.broadcast(data2) 

브로드 캐스트 변수 (lastnames)에 values attr이있는 udf에 액세스하십시오.

from pyspark.sql.functions import udf 
def getLastName(sid): 
    return lastnames.value[sid] 
+1

구현을 ** 방송 변수 **로 수정했습니다. 최대한 많은 순수한 함수로 UDF를 만들려고하면 너무 많은 외부 의존성으로 인해 성능이 저하 될 수 있습니다. – mrsrinivas

+0

'lastnames.value'를 보면'student1 ','Granger ','student2 ','Weasley ','student3 ','Potter '를 얻을 수 있습니다. ]', 이는'lastnames.value.filter'가 더 이상 제대로 작동하지 않는다는 것을 의미합니까? – tohweizhong

+0

예. udf에서'lastnames.value [ "sid"]'를 실행하고'sid'를 키로, 값을'lastname'으로하여 사전 (변수'data2')을 생성하십시오. – mrsrinivas

2

UDF 내부에서 직접 데이터 프레임 (또는 RDD)을 참조 할 수 없습니다. DataFrame 개체는 드라이버에서 클러스터에서 발생할 데이터와 동작을 나타내는 데 사용되는 핸들입니다. Spark가 선택할 때 UDF의 코드가 클러스터에서 실행됩니다. Spark은 코드를 직렬화하고 클로저에 포함 된 모든 변수의 복사본을 만들어 각 작업자에게 보냄으로써이를 수행합니다.

대신 Spark에서 제공하는 API를 사용하여 두 개의 DataFrames를 조인/결합하는 방법을 사용하고 싶습니다. 데이터 세트 중 하나가 작 으면 수동으로 브로드 캐스트 변수의 데이터를 송신 한 다음 UDF에서 액세스 할 수 있습니다. 그렇지 않은 경우처럼 두 데이터 프레임을 만든 다음 조인 작업을 사용하여 결합 할 수 있습니다. 이런 식으로 뭔가 작업을해야합니다 :

joined = scores.withColumnRenamed("student_id", "join_id") 
joined = joined.join(lastnames, joined.join_id == lastnames.student_id)\ 
       .drop("join_id") 
joined.show() 

+---------+-----+----------+---------+ 
| subject|score|student_id|last_name| 
+---------+-----+----------+---------+ 
|  Math| 13| student1| Granger| 
| Biology| 85| student1| Granger| 
|Chemistry| 77| student1| Granger| 
| Physics| 25| student1| Granger| 
|  Math| 50| student2| Weasley| 
| Biology| 45| student2| Weasley| 
|Chemistry| 65| student2| Weasley| 
| Physics| 79| student2| Weasley| 
|  Math| 9| student3| Potter| 
| Biology| 2| student3| Potter| 
|Chemistry| 84| student3| Potter| 
| Physics| 43| student3| Potter| 
+---------+-----+----------+---------+ 

을 그것은 후드 스파크 DataFrames에서 최적화를 가지고, 또한 주목할만한 가치가 어디에가있는 경우 셔플을 피하기 위해 방송 변수로 변환 할 수있는 참여의 일부인 DataFrame 충분히 작습니다. 따라서 위에 나열된 조인 방법을 수행하는 경우 더 큰 데이터 세트를 처리하는 기능을 희생하지 않고 가능한 최상의 성능을 얻어야합니다.

관련 문제