0

에 일련 번호를 추가합니다. 저는 Apache Bea와 Python, DataFlow 및 BigQuery를 사용하고 있습니다.아파치 빔 내가 차원 테이블을로드 할 ETL을 구축을 위해 노력하고 있어요 PCollection

나는 BigQuery에로의로드하기 위해 pcollection의 각 요소에 일련 번호를 지정해야하지만이 작업을 수행 할 수있는 방법을 찾을 수 없습니다.

DataFlow가 이전 집계를 만들기 위해 필요하다고 생각하고 시퀀스 번호를 추가하기 위해 최종 pcollection을 얻으 려하지만이 순간에 병렬 처리를 중지하고 내 pcollection을 목록에 캐스트해야합니다 (예 : .collect())를 사용하고 쉬운 루프를 만들어 시퀀스 번호를 지정하십시오. 맞아?

는 파이프 라인 내가 코딩했습니다되어 How to get a list of elements out of a PCollection in Google Dataflow and use it in the pipeline to loop Write Transforms?

가 어떻게 그것을 얻을 수 있습니다

p | ReadFromAvro(known_args.input) | beam.Map(adapt) | beam.GroupByKey() | beam.Map(adaptGroupBy) 

내가 pcollection에서 목록을 얻을 수있는 방법은 없습니다 읽은? 어떤 도움이 필요합니까? 당신이 원하는 것은 PCollection의 각 요소와 목록을 가져올 경우

+0

코드를 참조 측 입력에 대한 자세한 내용은? –

+0

이것은 빔을 사용하는 첫 번째 방법입니다. 나는 코드 조각을 추가하려고하지만 어떤 식 으로든 찾을 수 없다. –

+0

시퀀스 번호를 추가해야한다고 생각하는 이유에 대해 자세히 설명해 주시겠습니까? BigQuery에서이 일련 번호가 필요한 것은 무엇입니까? –

답변

1

, 당신은 측면 입력을 사용할 수 있습니다. 이렇게하면 결과에서 모든 병렬 처리가 제거되고 파이프 라인이 느려질 수 있습니다. 다음

여전히이 작업을 수행하려면

:

side_input_coll = beam.pvalue.AsIterable(my_collection) 

(p 
| beam.Create([0]) 
| beam.FlatMap(lambda _, my_seq: [(elem, i) for i, elem in enumerate(my_seq)], 
       my_seq=side_input_coll)) 

하지만 보존하는 병렬 처리를 잊지 마세요, 단순히 임의의 ID를 생성하는 것이 가장 좋습니다. PCollections은 본질적으로 정렬되어 있지 않습니다.

Beam Programming Guide on Side Inputs

당신이, 당신이 지금까지 시도 한 내용을 게시 할 수
관련 문제