Spark 2.0에서 약 5 천만 개의 dict가있는 rdd가 있습니다. 그것들은 꽤 작으며 약 12Gb의 메모리 만 차지합니다 (Spark 웹 UI의 Storage 탭 당). 나는이 RDD에서 원하는 모든 처리 과정을 훑어 보았고 이제는이 데이터를 다른 시스템에 공급해야하기 때문에 Spark에서 꺼내려고합니다.Spark - Python에서 데이터 가져 오기
나는 아무데도 가지 않고 도움이 필요합니다. 이상적으로, 모든 파티션을 드라이버로 보내고 다른 파이썬 모듈을 통해 로컬로 데이터를 덤프하도록하는 것이 이상적입니다. 최소한의 추가 코딩 만 필요합니다.
나는 이런 식으로 뭔가가 작동 것이라고 기대했다:
for x in processed_data.toDF().toLocalIterator():
index.add(x)
그러나 기쁨, 나는이 편리한 스택 추적을 가지고 :
<ipython-input-20-b347e9bd2075> in <module>()
----> 1 for x in processed_data.toDF().toLocalIterator():
2 index.add(x)
/apps/spark2/python/pyspark/rdd.py in _load_from_socket(port, serializer)
140 try:
141 rf = sock.makefile("rb", 65536)
--> 142 for item in serializer.load_stream(rf):
143 yield item
144 finally:
/apps/spark2/python/pyspark/serializers.py in load_stream(self, stream)
137 while True:
138 try:
--> 139 yield self._read_with_length(stream)
140 except EOFError:
141 return
/apps/spark2/python/pyspark/serializers.py in _read_with_length(self, stream)
154
155 def _read_with_length(self, stream):
--> 156 length = read_int(stream)
157 if length == SpecialLengths.END_OF_DATA_SECTION:
158 raise EOFError
/apps/spark2/python/pyspark/serializers.py in read_int(stream)
541
542 def read_int(stream):
--> 543 length = stream.read(4)
544 if not length:
545 raise EOFError
/usr/lib/python3.4/socket.py in readinto(self, b)
372 while True:
373 try:
--> 374 return self._sock.recv_into(b)
375 except timeout:
376 self._timeout_occurred = True
timeout: timed out
나는 모든 로그 파일을 확인을하고 난 아무 생각이 없다 그것이 무엇 일 수 있는지. 나는 더 작은 파티션과 여전히 운이 없도록 rdd를 다시 파티션하려고 시도했습니다.
ExecutorLostFailure (executor 3 exited caused by one of the running
tasks) Reason: Remote RPC client disassociated. Likely due to
containers exceeding thresholds, or network issues. Check driver logs
for WARN messages.
내가 로그를 확인하고도 문제가 표시되지 않은 : 내 드라이버가 RAM 40GB의 할당에 대해, 나는 다음을 수집하기 위해 노력하고 있기 때문에
은, 그때 이들의 무리를 받기 시작했다. 심지어 원격으로 HDFS에 밖으로 DF를 쓰고 실행을 완료 유일한 :
processed_data.toDF().write.json()
문제가 다음 난 그냥 각 개체 후 쉼표 같은 적절한 JSON 구문없이 데이터의 덤프를 얻을 수 있다는 것입니다 .. 그러나 ..
내가 여기에 뭔가를 놓친가요? 이것은 더 작은 데이터 세트로 이것을 시도했기 때문에 정말 실망스럽고 toLocalIterator는 훌륭하게 작동했습니다. 사전에
덕분에
이것은 핵심적인 문제가 아닌 증상 일 가능성이 큽니다. 또한 수집하기 전에'DataFrame'으로 변환하는 이유는 무엇입니까? 그것은 전혀 이해가되지 않습니다. – zero323
나는 rdd가 데이터 프레임에 write 메소드를 가지고 있다고 생각하지 않는다. 어떤 문제를 해결하기위한 좋은 방법은 무엇입니까? – browskie
그것은 많은 쓰기 메소드를 제공하며, 쓰기를 수행하지 않을 때 변환하는 이유를 여전히 설명하지 않습니다. Re O'DataFrame.write.json' - 한 줄에 유효한 JSON 문서를 제공합니다. 그것을 읽는 데 문제가 없어야합니다. – zero323