나는 성공적으로 JSON 객체로로드 함수를 정의하여 pubsub 문자열을 구문 분석 할 수 있었다 (parse_pubsub()를 참조). 내가 직면 한 이상한 문제는 글로벌 범위에서 json을 가져올 수 없다는 것입니다. "NameError : 글로벌 이름 'json'이 정의되지 않았습니다."오류가 발생했습니다. 함수 내에서 json을 가져와야했습니다.
from __future__ import absolute_import
import logging
import argparse
import apache_beam as beam
import apache_beam.transforms.window as window
'''Normalize pubsub string to json object'''
# Lines look like this:
# {'datetime': '2017-07-13T21:15:02Z', 'mac': 'FC:FC:48:AE:F6:94', 'status': 1}
def parse_pubsub(line):
import json
record = json.loads(line)
return (record['mac']), (record['status']), (record['datetime'])
def run(argv=None):
"""Build and run the pipeline."""
parser = argparse.ArgumentParser()
parser.add_argument(
'--input_topic', required=True,
help='Input PubSub topic of the form "/topics/<PROJECT>/<TOPIC>".')
parser.add_argument(
'--output_table', required=True,
help=
('Output BigQuery table for results specified as: PROJECT:DATASET.TABLE '
'or DATASET.TABLE.'))
known_args, pipeline_args = parser.parse_known_args(argv)
with beam.Pipeline(argv=pipeline_args) as p:
# Read the pubsub topic into a PCollection.
lines = (p | beam.io.ReadStringsFromPubSub(known_args.input_topic)
| beam.Map(parse_pubsub)
| beam.Map(lambda (mac_bq, status_bq, datetime_bq): {'mac': mac_bq, 'status': status_bq, 'datetime': datetime_bq})
| beam.io.WriteToBigQuery(
known_args.output_table,
schema=' mac:STRING, status:INTEGER, datetime:TIMESTAMP',
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND)
)
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
의견에 감사드립니다. 좀 더 실험을하면 들어오는 pub/sub 메시지가 문자열 (분명히)으로 들어오는 것처럼 보입니다. 선 객체를 사전으로 변환하는 변환을 적용해야합니다. 데이터 흐름에서 발생한 오류 메시지는 다음과 같습니다. ** 입력 유형 힌트 위반 : 예상되는 튜플 [TypeVariable [K], TypeVariable [V]], ** –
glux