2016-08-22 4 views
4

현재 Apache Beam을 사용하여 파이썬에서 gzip 파일을 읽을 수 있습니까? 내 파이프 라인이 코드 라인 GCS에서 GZIP 파일을 당기고 :파이썬에서 gzip 파일 열기 아파치 빔

beam.io.Read(beam.io.TextFileSource('gs://bucket/file.gz', compression_type='GZIP')) 

그러나 나는이 오류가 무엇입니까 : 우리는 파일을 압축 파이썬 빔 소스 코드에서 발견

UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte 

될 것을 싱크대에 쓸 때 처리됩니다. https://github.com/apache/incubator-beam/blob/python-sdk/sdks/python/apache_beam/io/fileio.py#L445

더 자세한 역 추적 :

Traceback (most recent call last): 
    File "beam-playground.py", line 11, in <module> 
    p.run() 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 159, in run 
    return self.runner.run(self) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 103, in run 
    super(DirectPipelineRunner, self).run(pipeline) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 98, in run 
    pipeline.visit(RunVisitor(self)) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 182, in visit 
    self._root_transform().visit(visitor, self, visited) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 419, in visit 
    part.visit(visitor, pipeline, visited) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/pipeline.py", line 422, in visit 
    visitor.visit_transform(self) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 93, in visit_transform 
    self.runner.run_transform(transform_node) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/runner.py", line 168, in run_transform 
    return m(transform_node) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 99, in func_wrapper 
    func(self, pvalue, *args, **kwargs) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 258, in run_Read 
    read_values(reader) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/runners/direct_runner.py", line 245, in read_values 
    read_result = [GlobalWindows.windowed_value(e) for e in reader] 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/io/fileio.py", line 807, in __iter__ 
    yield self.source.coder.decode(line) 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/site-packages/apache_beam/coders/coders.py", line 187, in decode 
    return value.decode('utf-8') 
    File "/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/encodings/utf_8.py", line 16, in decode 
    return codecs.utf_8_decode(input, errors, True) 
UnicodeDecodeError: 'utf8' codec can't decode byte 0x8b in position 1: invalid start byte 

답변

2

UPDATE : TextIO 파이썬 SDK에 이제 압축 파일에서 읽기를 지원합니다.

파이썬 SDK의 TextIO은 실제로 압축 파일에서 읽는 것을 지원하지 않습니다.

+0

빠른 응답을 보내 주셔서 감사합니다. 나는 그 기사를보고 훑어 보겠다. 질문에 전체 추적을 추가했습니다. – agsolid

+0

데이터가 UTF-8로 압축됩니다. 그리고 원본 텍스트를 바이트에서 유니 코드로 디코딩하여 확인했습니다. 오류가 없습니다.잘못된 것이지만 빔 소스 코드를 보면 TextFileSource가 압축 파일을 처리하지 못하는 것처럼 보입니다. – agsolid

+1

나는 더 깊이 바라 보았습니다. 나는 깊이의 지원에 대해 틀 렸습니다. 이 클래스는 실제 지원을 위해 compression_type 매개 변수를 허용합니다. –

3

비슷한 문제가 발생했습니다. 내가 파싱하고 데이터를 가져 오려는 사용자 정의 이진 소스가 있습니다. 문제는 file.io API가 CSV 또는 ARVO를 기반으로하며 어떤 시도를 하던지 나에게 라인을 나누지 않고 라인을 제공하지 않는다는 것입니다. 상상할 수 있듯이 바이너리 파일은이 문제를 잘 처리하지 못합니다.

처음에는 사용자 지정 소스를 사용해 보았습니다. 구현을 위해 3 개의 클래스가 길어졌고 핵심 Dataflow/Beam 코드가 중복되었습니다. 마지막으로 필자는 필자가 필요로했던 것을 얻으려는이 멋진 파이널 코드를 코딩했다.

import apache_beam as beam 
from apache_beam.io.fileio import coders 

def _TextFileReader__iter(self): 
    # The full data file is had here and can be read like normal 
    # You can even limit the character bit here. (I did 9 to grab the file format) 
    data = self._file.read() 
    # Now you can either yield the whole file as a single data entry 
    # and run a ParDo to split it, or you can iterate in here and 
    # yield each row. I chose the latter, but I'm showing an example 
    # of the former. 
    yield data 

# This monkeypatch good! 
beam.io.fileio.TextFileReader.__iter__ = _TextFileReader__iter 

이 소스를 호출하고 있는지 그것의가 BINARY 만들려면, 나는 한 다음

pipeline | 'start_3' >> beam.io.Read(
    beam.io.TextFileSource('gs://MY_BUCKET/sample.bin', 
     coder=coders.BytesCoder() 
    ) 
) 

을 주목 coders.BytesCoders()? 그것 없이는 바이트를 파싱 엔진에 좋지 않은 이진이 아닌 것으로 변환하려고 시도했습니다. ;)

좋은 하루를내어 알아 냈습니다. 그러나이 방법을 사용하면 Dataflow의 file.io 클래스로 거의 모든 작업을 수행 할 수 있습니다. ;)

1

동일한 문제가 발생했습니다. 나는 GCS에서 이진 GZ 파일을 읽고, 압축을 풀고, 처리를 위해 다른 곳으로 보내려고했다. 나는 그것을 두 단계로 풀었다.

먼저 올바른 Python 라이브러리를 사용하고 있는지 확인하십시오. 내 원본 라이브러리는 오래되었습니다 (적어도 v0.4를 사용합니다) : pip install --upgrade google-cloud-dataflow.

import apache_beam as beam 
from apache_beam import (coders, io, transforms) 

raw_logs = (p 
      | io.Read("ReadLogsFromGCS", beam.io.TextFileSource(
         "gs://my-bucket/logs-*.gz", 
         coder=coders.BytesCoder())) 
      | transforms.Map(lambda x: x) 
      | io.Write("WriteToLocalhost", io.textio.WriteToText(
         "/tmp/flattened-logs", 
         file_name_suffix=".json"))) 
p.run() 

당신은 파일이 파이프 라인을 실행 한 후 /tmp/flattened-logs.json라고해야 다음과 같이

둘째, 내 파이프 라인을 건설했다.

관련 문제