1

데이터 흐름을 사용하여 pubsub 메시지를 읽고 큰 쿼리에 쓰려고합니다. Google 팀에서 알파 액세스 권한을 부여 받았고 제공된 예제가 작동했지만 이제는 내 시나리오에 적용해야합니다.Python SDK를 사용하여 데이터 흐름 스트리밍 : PubSub 메시지를 BigQuery 출력으로 변환

Pubsub 페이로드 :

Message { 
    data: {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1} 
    attributes: {} 
} 

빅 쿼리 스키마 :

schema='mac:STRING, status:INTEGER, datetime:TIMESTAMP', 

내 목표는 단순히 메시지 페이로드를 읽고 BigQuery에 삽입하는 것입니다. 변환에 대해 머리 글자를 쓰는 데 어려움을 겪고 있으며 키/값을 큰 쿼리 스키마에 매핑해야합니다.

나는 매우 새로운 것이므로 어떤 도움을 주시면 감사하겠습니다.

현재 코드 : https://codeshare.io/ayqX8w

감사합니다!

답변

0

파이썬 SDK의 BigQuery 싱크에 기록 된 데이터는 사전의 각 키가 BigQuery 테이블의 필드를 제공하고 해당 값이 해당 필드에 기록 될 값을 제공하는 사전 형식이어야합니다. BigQuery RECORD 유형의 경우, 값 자체는 해당 키, 값 쌍을 갖는 사전이어야합니다.

내가 빔에 대응하는 파이썬 모듈의 문서를 개선하기 위해 JIRA을 제기 : https://issues.apache.org/jira/browse/BEAM-3090

+0

의견에 감사드립니다. 좀 더 실험을하면 들어오는 pub/sub 메시지가 문자열 (분명히)으로 들어오는 것처럼 보입니다. 선 객체를 사전으로 변환하는 변환을 적용해야합니다. 데이터 흐름에서 발생한 오류 메시지는 다음과 같습니다. ** 입력 유형 힌트 위반 : 예상되는 튜플 [TypeVariable [K], TypeVariable [V]], ** – glux

1

나는 성공적으로 JSON 객체로로드 함수를 정의하여 pubsub 문자열을 구문 분석 할 수 있었다 (parse_pubsub()를 참조). 내가 직면 한 이상한 문제는 글로벌 범위에서 json을 가져올 수 없다는 것입니다. "NameError : 글로벌 이름 'json'이 정의되지 않았습니다."오류가 발생했습니다. 함수 내에서 json을 가져와야했습니다.

from __future__ import absolute_import 

import logging 
import argparse 
import apache_beam as beam 
import apache_beam.transforms.window as window 

'''Normalize pubsub string to json object''' 
# Lines look like this: 
    # {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1} 
def parse_pubsub(line): 
    import json 
    record = json.loads(line) 
    return (record['mac']), (record['status']), (record['datetime']) 

def run(argv=None): 
    """Build and run the pipeline.""" 

    parser = argparse.ArgumentParser() 
    parser.add_argument(
     '--input_topic', required=True, 
     help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".') 
    parser.add_argument(
     '--output_table', required=True, 
     help= 
     ('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE ' 
     'or DATASET.TABLE.')) 
    known_args, pipeline_args = parser.parse_known_args(argv) 

    with beam.Pipeline(argv=pipeline_args) as p: 
    # Read the pubsub topic into a PCollection. 
    lines = (p | beam.io.ReadStringsFromPubSub(known_args.input_topic) 
       | beam.Map(parse_pubsub) 
       | beam.Map(lambda (mac_bq, status_bq, datetime_bq): {'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq}) 
       | beam.io.WriteToBigQuery(
        known_args.output_table, 
        schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP', 
        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
        write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND) 
      ) 

if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run() 
0

나는 비슷한 유스 케이스 (을 처리 한 후, PubSub에서 문자열로 행을 읽어 dicts로 변환 등)이 있습니다

아래 내 작업 코드를 참조하십시오.

나는 ast.literal_eval()을 사용하고 있는데, 저에게는 효과가있는 것 같습니다. 이 명령은 문자열을 평가하지만 eval()보다 안전합니다 (here 참조). 그것은 키가 문자열 인 dict을 반환해야하며, 값은 가장 가능성있는 유형 (int, str, float ...)으로 평가됩니다. 그러나 값이 올바른 유형인지 확인하고 싶을 수 있습니다.

이 당신에게 내가 BigQuery에서을 (아직) 사용하지 않은이

import ast 
lines = (p | beam.io.ReadStringsFromPubSub(known_args.input_topic) 
      | "JSON row to dict" >> beam.Map(
         lambda s: ast.literal_eval(s)) 
      | beam.io.WriteToBigQuery(...) 
     ) 

같은 파이프 라인을 줄 것이다, 그래서 나는 마지막 줄에 당신을 도울 수 없어요,하지만 당신이 쓴 것은 처음 눈에 올바른 보인다.

관련 문제