1

어떤 속성이없는 나는PySpark 오류 : AttributeError는 'NoneType'객체 '_jvm'

의 형식으로 타임 스탬프 데이터 집합을 가지고 있고이 데이터 집합을 처리하고 키 값의지도로 돌아 pyspark에서 UDF를 작성했습니다 . 그러나 오류 메시지 아래에 점점.

데이터 집합 : df_ts_list

+--------------------+ 
|    ts_list| 
+--------------------+ 
|[1477411200, 1477...| 
|[1477238400, 1477...| 
|[1477022400, 1477...| 
|[1477224000, 1477...| 
|[1477256400, 1477...| 
|[1477346400, 1476...| 
|[1476986400, 1477...| 
|[1477321200, 1477...| 
|[1477306800, 1477...| 
|[1477062000, 1477...| 
|[1477249200, 1477...| 
|[1477040400, 1477...| 
|[1477090800, 1477...| 
+--------------------+ 

Pyspark UDF :

>>> def on_time(ts_list): 
...  import sys 
...  import os 
...  sys.path.append('/usr/lib/python2.7/dist-packages') 
...  os.system("sudo apt-get install python-numpy -y") 
...  import numpy as np 
...  import datetime 
...  import time 
...  from datetime import timedelta 
...  ts = np.array(ts_list) 
...  if ts.size == 0: 
...    count = 0 
...    duration = 0 
...    st = time.mktime(datetime.now()) 
...    ymd = str(datetime.fromtimestamp(st).date()) 
...  else: 
...    ts.sort() 
...    one_tag = [] 
...    start = float(ts[0]) 
...    for i in range(len(ts)): 
...      if i == (len(ts)) - 1: 
...        end = float(ts[i]) 
...        a_round = [start, end] 
...        one_tag.append(a_round) 
...      else: 
...        diff = (datetime.datetime.fromtimestamp(float(ts[i+1])) - datetime.datetime.fromtimestamp(float(ts[i]))) 
...        if abs(diff.total_seconds()) > 3600: 
...          end = float(ts[i]) 
...          a_round = [start, end] 
...          one_tag.append(a_round) 
...          start = float(ts[i+1]) 
...    one_tag = [u for u in one_tag if u[1] - u[0] > 300] 
...    count = int(len(one_tag)) 
...    duration = int(np.diff(one_tag).sum()) 
...    ymd = str(datetime.datetime.fromtimestamp(time.time()).date()) 
...  return {'count':count,'duration':duration, 'ymd':ymd} 

Pyspark 코드 :

>>> on_time=udf(on_time, MapType(StringType(),StringType())) 
>>> df_ts_list.withColumn("one_tag", on_time("ts_list")).select("one_tag").show() 

오류 :

Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last): 
    File "/usr/lib/spark/python/pyspark/worker.py", line 172, in main 
    process() 
    File "/usr/lib/spark/python/pyspark/worker.py", line 167, in process 
    serializer.dump_stream(func(split_index, iterator), outfile) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 106, in <lambda> 
    func = lambda _, it: map(mapper, it) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 92, in <lambda> 
    mapper = lambda a: udf(*a) 
    File "/usr/lib/spark/python/pyspark/worker.py", line 70, in <lambda> 
    return lambda *a: f(*a) 
    File "<stdin>", line 27, in on_time 
    File "/usr/lib/spark/python/pyspark/sql/functions.py", line 39, in _ 
    jc = getattr(sc._jvm.functions, name)(col._jc if isinstance(col, Column) else col) 
AttributeError: 'NoneType' object has no attribute '_jvm' 

도움이 될 것입니다!

답변

5

udf의 27 번째 줄에서 일부 pyspark SQL 함수를 호출한다는 오류 메시지가 표시됩니다. 그것은 abs()과 일치하므로 어딘가에 from pyspark.sql.functions import *을 호출하고 파이썬의 abs() 함수를 오버라이드한다고 가정합니다.

0

마리우스 대답이 정말로 도움이되지 않았습니다. 그래서 당신이 나를 좋아한다면 그것이 구글의 유일한 결과이고 당신은 pyspark (그리고 일반적으로 스파크)에 처음이기 때문에 이것을 발견했다면, 나를 위해 일한 것이 여기에 있습니다.

내 경우에는 pyspark 환경이 설정되기 전에 pyspark 코드를 실행하려고했기 때문에이 오류가 발생했습니다.

pyspark.sql.functions에 의존하여 전화를 걸기 전에 pyspark를 사용할 수 있는지 확인하고 문제가 해결되었습니다.