0

여기에 상황이 있습니다 : GCS에서 압축되어 있으며 .gz 파일 확장자 (예 : 000000_ [0-5] .gz)가 있습니다. 단일 BQ 테이블 명령 줄에서 현재까지 명령을 실행했지만 Dataflow를 사용하여이를 수행하고 잠재적으로 향후 일부 변환을 추가하려고합니다.데이터 흐름 GCS에서 BQ 로의 문제

압축 된 GCS 파일의 데이터는 스키마를 자주 변경하는 복잡한 JSON 구조이므로이라는 하나의 열만있는 TSV로 전체 파일을 BigQuery로 가져 와서 BQ 내에서 JSON_EXTRACT 기능을 사용하는 것이 가장 쉽습니다 필요할 때 필요한 값을 파싱합니다.

문제 :이 시나리오에서는 최소한으로 수행 할 데이터 흐름 파이프 라인을 작성했습니다. GCS에서 읽고 BigQuery 테이블에 쓰기. 나는이 파이프 라인을 실행하면, 그러나, 나는 JSON 구문 분석 오류가 발생하고, 여기에 표시 :

Error while reading data, error message: JSON table encountered too 
many errors, giving up. Rows: 1; errors: 1., error: Error while reading 
data, error message: JSON table encountered too many errors, giving up. 
Rows: 1; errors: 1., error: Error while reading data, error message: 
JSON parsing error in row starting at position 2630029539: Value 
encountered without start of object. 

아래는 익명의 일부 변수 내 데이터 흐름 스크립트입니다.

from __future__ import absolute_import 

import argparse 
import logging 
import re 
import json 

import apache_beam as beam 
from apache_beam.io import ReadFromText 
from apache_beam.io import WriteToText 
from apache_beam.io import Read 
from apache_beam.io import WriteToText 
from apache_beam.io import WriteToBigQuery 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import SetupOptions 

def run(argv=None): 

    parser = argparse.ArgumentParser() 
    parser.add_argument('--input', 
         dest='input', 
         default='gs://BUCKET_NAME/input-data/000000_0.gz', 
         help='Input file to process.') 
    known_args, pipeline_args = parser.parse_known_args(argv) 
    pipeline_args.extend([ 
     '--runner=DataflowRunner', 
     '--project=GCP_PROJECT_NAME', 
     '--staging_location=gs://BUCKET_NAME/dataflow-staging', 
     '--temp_location=gs://BUCKET_NAME/dataflow-temp', 
     '--job_name=gcs-gzcomp-to-bq1', 
    ]) 

    pipeline_options = PipelineOptions(pipeline_args) 
    pipeline_options.view_as(SetupOptions).save_main_session = True 
    with beam.Pipeline(options=pipeline_options) as p: 

    (p | "ReadFromGCS" >> ReadFromText(known_args.input) 
     | WriteToBigQuery('TABLE_NAME', dataset='DATASET_NAME', 
      project='GCP_PROJECT_NAME', schema='record:string')) 

if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run() 

당신이 볼 수 있듯이

, 나는 내가 문자열 유형 단 하나의 열을 포함하는 스키마를 지정하여, 기존의로드 작업에 뭐하는 거지 같은 일을 시도,하지만 여전히 실패합니다.

데이터 흐름에 GCS 파일을 가져 오는 방법에 대한 자세한 내용을 명시 적으로 표시하는 방법이 있습니까? 즉 TSV가 각 행에서 유효한 JSON 객체이더라도 TSV를 지정합니까?

또한이 오류가 내가 망쳐 놨을 수있는 것과 관련이 있다면 전화 해주세요. 저는 Dataflow에 익숙하지 않지만 BQ & 다른 GCP 도구를 꽤 경험했습니다. 따라서 이것을 툴벨에 추가하려고합니다.

답변

0

WriteToBigQuery의 입력 컬렉션은 문자열 모음이 아닌 사전 모음 (각 키는 BigQuery 열에 매핑 됨)이어야한다고 생각합니다. | beam.Map(lambda line: dict(record=line))과 같은 것을 시도해보십시오.

+0

와우; 엄청 고마워. 너무 단순하지만 완벽하게 작동했습니다. – andre622

관련 문제