0
스파크 (pyspark)를 사용하여 데이터를 읽었으며 일부 데이터가 .gz 형식이므로 걸림 거리가 발생했습니다.스파크가 S3에서 압축을 풉니 다
%pyspark
data = sc.textFile("s3://mybucket.file.gz")
data.first()
Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5205743886772607083.py", line 267, in <module>
raise Exception(traceback.format_exc())
Exception: Traceback (most recent call last):
File "/tmp/zeppelin_pyspark-5205743886772607083.py", line 260, in <module>
exec(code)
File "<stdin>", line 1, in <module>
File "/usr/lib/spark/python/pyspark/rdd.py", line 1041, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/pyspark/rdd.py", line 1032, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/usr/lib/spark/python/pyspark/rdd.py", line 906, in fold
vals = self.mapPartitions(func).collect()
File "/usr/lib/spark/python/pyspark/rdd.py", line 809, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 4 times, most recent failure: Lost task 0.3 in stage 31.0 (TID 270, ip-172-16-238-231.us-west-1.compute.internal, executor 17): java.io.IOException: incorrect header check
at org.apache.hadoop.io.compress.zlib.ZlibDecompressor.inflateBytesDirect(Native Method)
이 내용을 읽고 압축을 푸는 방법에 대한 의견이 있으십니까?
어떤 오류가 발생합니까? –
다음 문서를 참조하십시오. https://github.com/bernhard-42/spark-unzip#2-gzip-compressed-data – flyingmeatball