2017-03-08 2 views
2

Google Cloud Dataflow를 사용하고 있고 PCollection의 모든 요소에 액세스해야하는 ParDo 기능이 있습니다. 이를 달성하기 위해, 나는 모든 요소의 단일 반복 가능을 포함하는 PCollection을 PCollection으로 변환하기를 원했습니다. 나는 내가 생각해 낸 것에 클리너/심플/더 빠른 해결책이 있는지 궁금해하고 있었다.PCollection에 PCollection <T>을 결합하는 간단한 방법 <Iterable<T>>

첫 번째 방법은 더미 키를 만들고 GroupByKey를 수행 한 후 값을 가져 오는 것입니다.

PCollection<MyType> myData; 
// AddDummyKey() outputs KV.of(1, context.element()) for everything 
PCollection<KV<Integer, MyType>> myDataKeyed = myData.apply(ParDo.of(new AddDummyKey())); 
// Group by dummy key 
PCollection<KV<Integer, Iterable<MyType>>> myDataGrouped = myDataKeyed.apply(GroupByKey.create()); 
// Extract values 
PCollection<Iterable<MyType>> myDataIterable = myDataGrouped.apply(Values.<Iterable<MyType>>create() 

번째 방법

여기서 추천 따랐다 How do I make View's asList() sortable in Google Dataflow SDK?하지만 정렬없이. View.asList()를 만들고 더미 PCollection을 만든 다음 뷰와 함께 사이드 PCollection에 ParDo 함수를 적용하여 뷰를 반환했습니다.

PCollection<MyType> myData; 
// Create view of the PCollection as a list 
PCollectionView<List<MyType>> myDataView = myData.apply(View.asList()); 
// Create dummy PCollection 
PCollection<Integer> dummy = pipeline.apply(Create.<Integer>of(1)); 
// Apply dummy ParDo that returns the view 
PCollection<List<MyType>> myDataList = dummy.apply(
     ParDo.withSideInputs(myDataView).of(new DoFn<Integer, List<MyType>>() { 
      @Override 
      public void processElement(ProcessContext c) { 
       c.output(c.sideInput(myDataView)); 
      } 
     })); 

이 작업에는 미리 정의 된 결합 기능이있는 것처럼 보이지만 찾을 수 없습니다. 도와 주셔서 감사합니다!

답변

1

당신이 모든 것을 필요로한다는 것을 알고 있다면, 두 가지 방법 모두 합리적입니다. 둘 다 Dataflow SDK에서 사용되었고 나중에 Apache Beam SDK가 될 때 사용되었습니다.

  1. 사이드 입력 후 모든 것을 출력합니다. 이것은 실제로 DataflowAssert의 작동 방식입니다. 다른 백엔드 러너가 측면 입력을 다르게 구현할 수있는 Beam에서는 가정이 거의없고 매우 큰 측면 입력의 스트리밍을 허용 할 수 있으므로 View.asIterable()을 선호합니다.
  2. 단일 키로 그룹화 한 다음 키를 놓으십시오. 이것이 빔의 후속 모델 PAssert의 작동 방식입니다. 동일한 작업을 수행하고 빈 콜렉션을 조금 더 신경 써야하지만 더 많은 빔 러너는 측면 입력 지원보다 우수한 GroupByKey 지원을 제공합니다 (특히 새롭고 아직 개발중인 경우).

그래서 View.asIterable()은 기본적으로 사용자가 원하는 것입니다. 두 번째 버전을 수행하는 GroupGlobally 변환에 대한 요청도있었습니다. 그것은 어떤 시점에서 일어날 수 있습니다.

관련 문제