2017-10-11 9 views
1

때 실행 PySpark 코드의 조각 다음사용 UDF

nlp = NLPFunctions() 

def parse_ingredients(ingredient_lines): 
    parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0] 
    return list(chain.from_iterable(parsed_ingredients)) 


udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType())) 

나는 다음과 같은 오류 얻을 : PySpark이 사용자 정의 클래스를 직렬화 할 수 없기 때문에 _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects

나는이 상상이 . 그러나 parse_ingredients_line 함수를 실행할 때마다이 값 비싼 오브젝트를 인스턴스화하는 오버 헤드를 피하려면 어떻게해야합니까?

답변

0

편집 :이 대답은 잘못되었습니다. 개체는 여전히 serialize 된 다음 브로드 캐스트 될 때 역 직렬화되므로 직렬화를 피할 수 없습니다.


(Tips for properly using large broadcast variables?)

broadcast variable를 사용해보십시오.

class Identity(object):     
    def __getstate__(self): 
     raise NotImplementedError("Not serializable") 

    def identity(self, x): 
     return x 

당신이 예를 들어 호출 객체 (f.py)를 사용할 수 있으며, 클래스 멤버로 Identity 인스턴스를 저장 :

sc = SparkContext() 
nlp_broadcast = sc.broadcast(nlp) # Stores nlp in de-serialized format. 

def parse_ingredients(ingredient_lines): 
    parsed_ingredients = nlp_broadcast.value.getingredients_bulk(ingredient_lines)[0] 
    return list(chain.from_iterable(parsed_ingredients)) 
+0

이 제안 된 해결책은 나에게도 같은 오류를줍니다. –

1

의 당신이 다음과 같이 정의 Identity 클래스 (identity.py)를 사용하고 싶은 말은하자 :

from identity import Identity 

class F(object):       
    identity = None 

    def __call__(self, x): 
     if not F.identity: 
      F.identity = Identity() 
     return F.identity.identity(x) 

아래와 같이 다음을 사용 :

,369을
from pyspark.sql.functions import udf 
import f 

sc.addPyFile("identity.py") 
sc.addPyFile("f.py") 

f_ = udf(f.F()) 

spark.range(3).select(f_("id")).show() 
+-----+ 
|F(id)| 
+-----+ 
| 0| 
| 1| 
| 2| 
+-----+ 

또는 독립형 기능과 폐쇄 :

from pyspark.sql.functions import udf 
import identity 

sc.addPyFile("identity.py") 

def f(): 
    dict_ = {}     
    @udf()    
    def f_(x):     
     if "identity" not in dict_: 
      dict_["identity"] = identity.Identity() 
     return dict_["identity"].identity(x) 
    return f_ 


spark.range(3).select(f()("id")).show() 
+------+ 
|f_(id)| 
+------+ 
|  0| 
|  1| 
|  2| 
+------+