여기에 상황이 있습니다 : GCS에서 압축되어 있으며 .gz 파일 확장자 (예 : 000000_ [0-5] .gz)가 있습니다. 단일 BQ 테이블 명령 줄에서 현재까지 명령을 실행했지만 Dataflow를 사용하여이를 수행하고 잠재적으로 향후 일부 변환을 추가하려고합니다.데이터 흐름 GCS에서 BQ 로의 문제
압축 된 GCS 파일의 데이터는 스키마를 자주 변경하는 복잡한 JSON 구조이므로이라는 하나의 열만있는 TSV로 전체 파일을 BigQuery로 가져 와서 BQ 내에서 JSON_EXTRACT 기능을 사용하는 것이 가장 쉽습니다 필요할 때 필요한 값을 파싱합니다.
문제 :이 시나리오에서는 최소한으로 수행 할 데이터 흐름 파이프 라인을 작성했습니다. GCS에서 읽고 BigQuery 테이블에 쓰기. 나는이 파이프 라인을 실행하면, 그러나, 나는 JSON 구문 분석 오류가 발생하고, 여기에 표시 :
Error while reading data, error message: JSON table encountered too
many errors, giving up. Rows: 1; errors: 1., error: Error while reading
data, error message: JSON table encountered too many errors, giving up.
Rows: 1; errors: 1., error: Error while reading data, error message:
JSON parsing error in row starting at position 2630029539: Value
encountered without start of object.
아래는 익명의 일부 변수 내 데이터 흐름 스크립트입니다.
from __future__ import absolute_import
import argparse
import logging
import re
import json
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.io import Read
from apache_beam.io import WriteToText
from apache_beam.io import WriteToBigQuery
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
def run(argv=None):
parser = argparse.ArgumentParser()
parser.add_argument('--input',
dest='input',
default='gs://BUCKET_NAME/input-data/000000_0.gz',
help='Input file to process.')
known_args, pipeline_args = parser.parse_known_args(argv)
pipeline_args.extend([
'--runner=DataflowRunner',
'--project=GCP_PROJECT_NAME',
'--staging_location=gs://BUCKET_NAME/dataflow-staging',
'--temp_location=gs://BUCKET_NAME/dataflow-temp',
'--job_name=gcs-gzcomp-to-bq1',
])
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
(p | "ReadFromGCS" >> ReadFromText(known_args.input)
| WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME',
project='GCP_PROJECT_NAME', schema='record:string'))
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
당신이 볼 수 있듯이
, 나는 내가 문자열 유형 단 하나의 열을 포함하는 스키마를 지정하여, 기존의로드 작업에 뭐하는 거지 같은 일을 시도,하지만 여전히 실패합니다.데이터 흐름에 GCS 파일을 가져 오는 방법에 대한 자세한 내용을 명시 적으로 표시하는 방법이 있습니까? 즉 TSV가 각 행에서 유효한 JSON 객체이더라도 TSV를 지정합니까?
또한이 오류가 내가 망쳐 놨을 수있는 것과 관련이 있다면 전화 해주세요. 저는 Dataflow에 익숙하지 않지만 BQ & 다른 GCP 도구를 꽤 경험했습니다. 따라서 이것을 툴벨에 추가하려고합니다.
와우; 엄청 고마워. 너무 단순하지만 완벽하게 작동했습니다. – andre622