0

Google 스패너 데이터베이스에서 테이블을 읽으려고하고 텍스트 파일에 쓰기 위해 python SDK와 함께 google dataflow를 사용하여 백업합니다.google dataflow 스패너에서 읽음

from __future__ import absolute_import 

import argparse 
import itertools 
import logging 
import re 
import time 
import datetime as dt 
import logging 

import apache_beam as beam 
from apache_beam.io import iobase 
from apache_beam.io import WriteToText 
from apache_beam.io.range_trackers import OffsetRangeTracker, UnsplittableRangeTracker 
from apache_beam.metrics import Metrics 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions 
from apache_beam.options.pipeline_options import GoogleCloudOptions 

from google.cloud.spanner.client import Client 
from google.cloud.spanner.keyset import KeySet 

BUCKET_URL = 'gs://my_bucket' 
OUTPUT = '%s/output/' % BUCKET_URL 
PROJECT_ID = 'my_project' 
INSTANCE_ID = 'my_instance' 
DATABASE_ID = 'my_db' 
JOB_NAME = 'spanner-backup' 
TABLE = 'my_table' 


class SpannerSource(iobase.BoundedSource): 
    def __init__(self): 
    logging.info('Enter __init__') 

    self.spannerOptions = { 
     "id": PROJECT_ID, 
     "instance": INSTANCE_ID, 
     "database": DATABASE_ID 
    } 
    self.SpannerClient = Client 

    def estimate_size(self): 
    logging.info('Enter estimate_size') 
    return 1 

    def get_range_tracker(self, start_position=None, stop_position=None): 
    logging.info('Enter get_range_tracker') 
    if start_position is None: 
     start_position = 0 
    if stop_position is None: 
     stop_position = OffsetRangeTracker.OFFSET_INFINITY 

    range_tracker = OffsetRangeTracker(start_position, stop_position) 
    return UnsplittableRangeTracker(range_tracker) 

    def read(self, range_tracker): # This is not called when using the dataflowRunner ! 
    logging.info('Enter read') 
    # instantiate spanner client 
    spanner_client = self.SpannerClient(self.spannerOptions["id"]) 
    instance = spanner_client.instance(self.spannerOptions["instance"]) 
    database = instance.database(self.spannerOptions["database"]) 

    # read from table 
    table_fields = database.execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % TABLE) 
    table_fields.consume_all() 
    self.columns = [x[0] for x in table_fields] 
    keyset = KeySet(all_=True) 
    results = database.read(table=TABLE, columns=self.columns, keyset=keyset) 

    # iterator over rows 
    results.consume_all() 
    for row in results: 
     JSON_row = { 
     self.columns[i]: row[i] for i in range(len(self.columns)) 
     } 
     yield JSON_row 

    def split(self, start_position=None, stop_position=None): 
    # this should not be called since the source is unspittable 
    logging.info('Enter split') 
    if start_position is None: 
     start_position = 0 
    if stop_position is None: 
     stop_position = 1 

    # Because the source is unsplittable (for now), only a single source is returned 
    yield iobase.SourceBundle(
     weight=1, 
     source=self, 
     start_position=start_position, 
     stop_position=stop_position) 


def run(argv=None): 
    """Main entry point""" 
    pipeline_options = PipelineOptions() 
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) 
    google_cloud_options.project = PROJECT_ID 
    google_cloud_options.job_name = JOB_NAME 
    google_cloud_options.staging_location = '%s/staging' % BUCKET_URL 
    google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL 

    #pipeline_options.view_as(StandardOptions).runner = 'DirectRunner' 
    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner' 
    p = beam.Pipeline(options=pipeline_options) 

    output = p | 'Get Rows from Spanner' >> beam.io.Read(SpannerSource()) 
    iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat() 
    output | 'Store in GCS' >> WriteToText(file_path_prefix=OUTPUT + iso_datetime + '-' + TABLE, file_name_suffix='') # if this line is commented, job completes but does not do anything 


    result = p.run() 
    result.wait_until_finish() 


if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run() 

그러나,이 스크립트 만 DirectRunner에 제대로 실행 : 나는 그것이 DataflowRunner에서 실행하게하는 경우,이 오류로 종료하기 전에, 모든 출력이없는 동안 실행 나는 다음과 같은 스크립트를 작성했습니다 :

"Executing failure step failure14 [...] Workflow failed. Causes: [...] The worker lost contact with the service."

때때로 출력을 생성하지 않고 영원히 계속됩니다.

또한 'output = ...'행에 주석을 달면 작업이 완료되지만 실제로 데이터를 읽지는 않습니다.

또한 dataflowRunner는 소스의 'estimate_size'함수를 호출하지만 'read'또는 'get_range_tracker'함수는 호출하지 않습니다.

이 문제의 원인에 대한 의견이있는 사람이 있습니까? 실험용 스패너 소스/싱크를 사용할 수있는 (더 완전한) Java SDK가 있다는 것을 알고 있습니다.하지만 가능한 경우 Python을 사용하고 싶습니다.

덕분에

+0

우리는 우선 순위를 봤는데 자바 데이터 흐름이 나는 ParDos를 사용하여 파이썬 커넥터를 자바를 사용하거나 구현하는 당신에게 추천 할 것입니다. 첫 번째 커넥터, https://beam.apache.org/documentation/sdks 참조/python-custom-io/ –

+0

@MairbekKhadikov에 감사드립니다. 당분간 ParDo 방식을 시도해 보겠습니다. –

답변

0

내가 대신 BoundedSource 클래스를 사용하는, 단순히 파르을 사용하는 제안 다음 내 코드를 재 작업했다. 참고로 여기 내 해결책이있다. 나는 그것을 개선 할 수있는 많은 방법들이있을 것이라고 확신하며, 의견을 듣는 것을 기쁘게 생각합니다. 내가하지 않으면, 내가 해결할 수없는 오류

AttributeError: 'PBegin' object has no attribute 'windowing'

을 얻을 수 (파이프 라인을 시작할 때 나는에 내가 가진 것을 놀랜다 특히 는 더미 PColl을 만들 수 있습니다. 더미 PColl는 조금 느낌 해킹처럼.

from __future__ import absolute_import 

import datetime as dt 
import logging 

import apache_beam as beam 
from apache_beam.io import WriteToText 
from apache_beam.options.pipeline_options import PipelineOptions 
from apache_beam.options.pipeline_options import StandardOptions, SetupOptions 
from apache_beam.options.pipeline_options import GoogleCloudOptions 
from google.cloud.spanner.client import Client 
from google.cloud.spanner.keyset import KeySet 

BUCKET_URL = 'gs://my_bucket' 
OUTPUT = '%s/some_folder/' % BUCKET_URL 
PROJECT_ID = 'my_project' 
INSTANCE_ID = 'my_instance' 
DATABASE_ID = 'my_database' 
JOB_NAME = 'my_jobname' 

class ReadTables(beam.DoFn): 
    def __init__(self, project, instance, database): 
     super(ReadTables, self).__init__() 
     self._project = project 
     self._instance = instance 
     self._database = database 

    def process(self, element): 
     # get list of tables in the database 
     table_names_row = Client(self._project).instance(self._instance).database(self._database).execute_sql('SELECT t.table_name FROM information_schema.tables AS t') 
     for row in table_names_row: 
      if row[0] in [u'COLUMNS', u'INDEXES', u'INDEX_COLUMNS', u'SCHEMATA', u'TABLES']: # skip these 
       continue 
      yield row[0] 

class ReadSpannerTable(beam.DoFn): 
    def __init__(self, project, instance, database): 
     super(ReadSpannerTable, self).__init__() 
     self._project = project 
     self._instance = instance 
     self._database = database 

    def process(self, element): 
     # first read the columns present in the table 
     table_fields = Client(self._project).instance(self._instance).database(self._database).execute_sql("SELECT t.column_name FROM information_schema.columns AS t WHERE t.table_name = '%s'" % element) 
     columns = [x[0] for x in table_fields] 

     # next, read the actual data in the table 
     keyset = KeySet(all_=True) 
     results_streamed_set = Client(self._project).instance(self._instance).database(self._database).read(table=element, columns=columns, keyset=keyset) 

     for row in results_streamed_set: 
      JSON_row = { columns[i]: row[i] for i in xrange(len(columns)) } 
      yield (element, JSON_row)   # output pairs of (table_name, data) 

def run(argv=None): 
    """Main entry point""" 
    pipeline_options = PipelineOptions() 
    pipeline_options.view_as(SetupOptions).save_main_session = True 
    pipeline_options.view_as(SetupOptions).requirements_file = "requirements.txt" 
    google_cloud_options = pipeline_options.view_as(GoogleCloudOptions) 
    google_cloud_options.project = PROJECT 
    google_cloud_options.job_name = JOB_NAME 
    google_cloud_options.staging_location = '%s/staging' % BUCKET_URL 
    google_cloud_options.temp_location = '%s/tmp' % BUCKET_URL 

    pipeline_options.view_as(StandardOptions).runner = 'DataflowRunner' 
    p = beam.Pipeline(options=pipeline_options) 

    init = p  | 'Begin pipeline'    >> beam.Create(["test"])             # have to create a dummy transform to initialize the pipeline, surely there is a better way ? 
    tables = init | 'Get tables from Spanner'  >> beam.ParDo(ReadTables(PROJECT, INSTANCE_ID, DATABASE_ID))   # read the tables in the db 
    rows = (tables | 'Get rows from Spanner table' >> beam.ParDo(ReadSpannerTable(PROJECT, INSTANCE_ID, DATABASE_ID)) # for each table, read the entries 
        | 'Group by table'    >> beam.GroupByKey() 
        | 'Formatting'     >> beam.Map(lambda (table_name, rows): (table_name, list(rows))))  # have to force to list here (dataflowRunner produces _Unwindowedvalues) 

    iso_datetime = dt.datetime.now().replace(microsecond=0).isoformat() 
    rows    | 'Store in GCS'    >> WriteToText(file_path_prefix=OUTPUT + iso_datetime, file_name_suffix='') 

    result = p.run() 
    result.wait_until_finish() 

if __name__ == '__main__': 
    logging.getLogger().setLevel(logging.INFO) 
    run() 
관련 문제