2016-11-30 3 views
0

데이터 프레임 열에서 사용자 정의 함수를 실행하려고합니다. 열에 긴 이메일 주소가 포함 된 긴 문자열이 있습니다. 문자열 형식이 같은 것입니다 :PySpark에서 열에 사용자 정의 함수 적용

"Don Joe<[email protected]>, Matt Scheurer <[email protected]>, Dan Lawler <[email protected]>" 

내가 이메일을 추출하는 정규식을 실행했습니다, 그때 나는 열 전체에 얼마나 많은 고유의 이메일 찾을했습니다.

저는 정규식을 작성하고 파이썬에서 고유 한 전자 메일 목록을 만들 수 있습니다. 하지만 스파크 데이터 프레임에이 함수를 적용하는 법을 모르겠습니다. 나는 이런 식으로 시도했다 :

all_names = set() 

def get_distinct_users(userlist): 
    global all_names 
    for email in re.findall('\<\S*\>',userlist): 
     all_names.add(email) 

get_distinct_users_udf = udf(get_distinct_users,StringType()) 
users = users.withColumn("user_count",get_distinct_users_udf(users["users"])) 

그러나 gloabl 변수 all_names가 업데이트되지 않습니다. UDF를 만드는 대신 맵 함수를 적용해야합니까, 아니면 집계 함수의 일종이기 때문에 줄일 수 있습니까?

아이디어가 있으십니까? 이 작업을 수행 할 수

+0

물론 이것은 효과가 없을 것입니다. 각 실행자는'all_names'의 자체 복사본을 얻습니다. 다른 집행자는 그것을 액세스 할 수 없습니다 ... – user4601931

+0

'all_names'을 누적 계산기로 만들면 어떻게 될까요? – anwartheravian

+0

숫자 형식에는 [built in support] (http://spark.apache.org/docs/latest/programming-guide.html#accumulators) 밖에 없지만 직접 만들 수는 있습니다. – user4601931

답변

2

한 가지 방법은

import re 

def get_email(x): 
    return re.findall("\<\S*\>", x) 

uniqueEmails = users.select("users").rdd\ 
    .flatMap(lambda x: get_email(x[0]))\ 
    .distinct() 

는 별개의 이메일 주소의 RDD 것, 예를 들어, 열 통해 전자 메일 주소 목록을 추출하는 flatMap하는 기능이다.