2016-10-01 5 views
0

Spark 2.0에서 약 5 천만 개의 dict가있는 rdd가 있습니다. 그것들은 꽤 작으며 약 12Gb의 메모리 만 차지합니다 (Spark 웹 UI의 Storage 탭 당). 나는이 RDD에서 원하는 모든 처리 과정을 훑어 보았고 이제는이 데이터를 다른 시스템에 공급해야하기 때문에 Spark에서 꺼내려고합니다.Spark - Python에서 데이터 가져 오기

나는 아무데도 가지 않고 도움이 필요합니다. 이상적으로, 모든 파티션을 드라이버로 보내고 다른 파이썬 모듈을 통해 로컬로 데이터를 덤프하도록하는 것이 이상적입니다. 최소한의 추가 코딩 만 필요합니다.

나는 이런 식으로 뭔가가 작동 것이라고 기대했다

:

for x in processed_data.toDF().toLocalIterator(): 
    index.add(x) 

그러나 기쁨, 나는이 편리한 스택 추적을 가지고 :

<ipython-input-20-b347e9bd2075> in <module>() 
----> 1 for x in processed_data.toDF().toLocalIterator(): 
     2  index.add(x) 

/apps/spark2/python/pyspark/rdd.py in _load_from_socket(port, serializer) 
    140  try: 
    141   rf = sock.makefile("rb", 65536) 
--> 142   for item in serializer.load_stream(rf): 
    143    yield item 
    144  finally: 

/apps/spark2/python/pyspark/serializers.py in load_stream(self, stream) 
    137   while True: 
    138    try: 
--> 139     yield self._read_with_length(stream) 
    140    except EOFError: 
    141     return 

/apps/spark2/python/pyspark/serializers.py in _read_with_length(self, stream) 
    154 
    155  def _read_with_length(self, stream): 
--> 156   length = read_int(stream) 
    157   if length == SpecialLengths.END_OF_DATA_SECTION: 
    158    raise EOFError 

/apps/spark2/python/pyspark/serializers.py in read_int(stream) 
    541 
    542 def read_int(stream): 
--> 543  length = stream.read(4) 
    544  if not length: 
    545   raise EOFError 

/usr/lib/python3.4/socket.py in readinto(self, b) 
    372   while True: 
    373    try: 
--> 374     return self._sock.recv_into(b) 
    375    except timeout: 
    376     self._timeout_occurred = True 

timeout: timed out 

나는 모든 로그 파일을 확인을하고 난 아무 생각이 없다 그것이 무엇 일 수 있는지. 나는 더 작은 파티션과 여전히 운이 없도록 rdd를 다시 파티션하려고 시도했습니다.

ExecutorLostFailure (executor 3 exited caused by one of the running 
tasks) Reason: Remote RPC client disassociated. Likely due to 
containers exceeding thresholds, or network issues. Check driver logs 
for WARN messages. 

내가 로그를 확인하고도 문제가 표시되지 않은 : 내 드라이버가 RAM 40GB의 할당에 대해, 나는 다음을 수집하기 위해 노력하고 있기 때문에

은, 그때 이들의 무리를 받기 시작했다. 심지어 원격으로 HDFS에 밖으로 DF를 쓰고 실행을 완료 유일한 :

processed_data.toDF().write.json() 

문제가 다음 난 그냥 각 개체 후 쉼표 같은 적절한 JSON 구문없이 데이터의 덤프를 얻을 수 있다는 것입니다 .. 그러나 ..

내가 여기에 뭔가를 놓친가요? 이것은 더 작은 데이터 세트로 이것을 시도했기 때문에 정말 실망스럽고 toLocalIterator는 훌륭하게 작동했습니다. 사전에

덕분에

+0

이것은 핵심적인 문제가 아닌 증상 일 가능성이 큽니다. 또한 수집하기 전에'DataFrame'으로 변환하는 이유는 무엇입니까? 그것은 전혀 이해가되지 않습니다. – zero323

+0

나는 rdd가 데이터 프레임에 write 메소드를 가지고 있다고 생각하지 않는다. 어떤 문제를 해결하기위한 좋은 방법은 무엇입니까? – browskie

+0

그것은 많은 쓰기 메소드를 제공하며, 쓰기를 수행하지 않을 때 변환하는 이유를 여전히 설명하지 않습니다. Re O'DataFrame.write.json' - 한 줄에 유효한 JSON 문서를 제공합니다. 그것을 읽는 데 문제가 없어야합니다. – zero323

답변

0

나는이 알려진 버그 이해 : https://issues.apache.org/jira/browse/SPARK-18281

그것은 버전에서 해결해야

2.0.3과 2.1.1 (그들 중 누구도 아직 발표되지 않고, 2.1가 보인다 여전히 버그가있다.)

그동안 메모리가 문제가되지 않는다면 toLocalIteratorcollect으로 대체해야합니다.