Google 스패너 데이터베이스에서 테이블을 읽으려고하고 텍스트 파일에 쓰기 위해 python SDK와 함께 google dataflow를 사용하여 백업합니다.google dataflow 스패너에서 읽음
from __future__ import absolute_import
import argparse
import itertools
import logging
import re
import time
import datetime as dt
import logging
import apache_beam as beam
from apache_beam.io import iobase
from apache_beam.io import WriteToText
from apache_beam.io.range_trackers import OffsetRangeTracker, UnsplittableRangeTracker
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from google.cloud.spanner.client import Client
from google.cloud.spanner.keyset import KeySet
BUCKET_URL = 'gs://my_bucket'
OUTPUT = '%s/output/' % BUCKET_URL
PROJECT_ID = 'my_project'
INSTANCE_ID = 'my_instance'
DATABASE_ID = 'my_db'
JOB_NAME = 'spanner-backup'
TABLE = 'my_table'
class SpannerSource(iobase.BoundedSource):
def __init__(self):
logging.info('Enter __init__')
self.spannerOptions = {
"id": PROJECT_ID,
"instance": INSTANCE_ID,
"database": DATABASE_ID
}
self.SpannerClient = Client
def estimate_size(self):
logging.info('Enter estimate_size')
return 1
def get_range_tracker(self, start_position=None, stop_position=None):
logging.info('Enter get_range_tracker')
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = OffsetRangeTracker.OFFSET_INFINITY
range_tracker = OffsetRangeTracker(start_position, stop_position)
return UnsplittableRangeTracker(range_tracker)
def read(self, range_tracker): # This is not called when using the dataflowRunner !
logging.info('Enter read')
# instantiate spanner client
spanner_client = self.SpannerClient(self.spannerOptions["id"])
instance = spanner_client.instance(self.spannerOptions["instance"])
database = instance.database(self.spannerOptions["database"])
# read from table
table_fields = database.execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % TABLE)
table_fields.consume_all()
self.columns = [x[0] for x in table_fields]
keyset = KeySet(all_=True)
results = database.read(table=TABLE, columns=self.columns, keyset=keyset)
# iterator over rows
results.consume_all()
for row in results:
JSON_row = {
self.columns[i]: row[i] for i in range(len(self.columns))
}
yield JSON_row
def split(self, start_position=None, stop_position=None):
# this should not be called since the source is unspittable
logging.info('Enter split')
if start_position is None:
start_position = 0
if stop_position is None:
stop_position = 1
# Because the source is unsplittable (for now), only a single source is returned
yield iobase.SourceBundle(
weight=1,
source=self,
start_position=start_position,
stop_position=stop_position)
def run(argv=None):
"""Main entry point"""
pipeline_options = PipelineOptions()
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = PROJECT_ID
google_cloud_options.job_name = JOB_NAME
google_cloud_options.staging_location = '%s/staging' % BUCKET_URL
google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL
#pipeline_options.view_as(StandardOptions).runner = 'DirectRunner'
pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner'
p = beam.Pipeline(options=pipeline_options)
output = p | 'Get Rows from Spanner' >> beam.io.Read(SpannerSource())
iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat()
output | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime + '-' + TABLE, file_name_suffix='') # if this line is commented, job completes but does not do anything
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
그러나,이 스크립트 만 DirectRunner에 제대로 실행 : 나는 그것이 DataflowRunner에서 실행하게하는 경우,이 오류로 종료하기 전에, 모든 출력이없는 동안 실행 나는 다음과 같은 스크립트를 작성했습니다 :
"Executing failure step failure14 [...] Workflow failed. Causes: [...] The worker lost contact with the service."
때때로 출력을 생성하지 않고 영원히 계속됩니다.
또한 'output = ...'행에 주석을 달면 작업이 완료되지만 실제로 데이터를 읽지는 않습니다.
또한 dataflowRunner는 소스의 'estimate_size'함수를 호출하지만 'read'또는 'get_range_tracker'함수는 호출하지 않습니다.
이 문제의 원인에 대한 의견이있는 사람이 있습니까? 실험용 스패너 소스/싱크를 사용할 수있는 (더 완전한) Java SDK가 있다는 것을 알고 있습니다.하지만 가능한 경우 Python을 사용하고 싶습니다.
덕분에
우리는 우선 순위를 봤는데 자바 데이터 흐름이 나는 ParDos를 사용하여 파이썬 커넥터를 자바를 사용하거나 구현하는 당신에게 추천 할 것입니다. 첫 번째 커넥터, https://beam.apache.org/documentation/sdks 참조/python-custom-io/ –
@MairbekKhadikov에 감사드립니다. 당분간 ParDo 방식을 시도해 보겠습니다. –