Google Cloud Dataflow를 사용하고 있고 PCollection의 모든 요소에 액세스해야하는 ParDo 기능이 있습니다. 이를 달성하기 위해, 나는 모든 요소의 단일 반복 가능을 포함하는 PCollection을 PCollection으로 변환하기를 원했습니다. 나는 내가 생각해 낸 것에 클리너/심플/더 빠른 해결책이 있는지 궁금해하고 있었다.PCollection에 PCollection <T>을 결합하는 간단한 방법 <Iterable<T>>
첫 번째 방법은 더미 키를 만들고 GroupByKey를 수행 한 후 값을 가져 오는 것입니다.
PCollection<MyType> myData;
// AddDummyKey() outputs KV.of(1, context.element()) for everything
PCollection<KV<Integer, MyType>> myDataKeyed = myData.apply(ParDo.of(new AddDummyKey()));
// Group by dummy key
PCollection<KV<Integer, Iterable<MyType>>> myDataGrouped = myDataKeyed.apply(GroupByKey.create());
// Extract values
PCollection<Iterable<MyType>> myDataIterable = myDataGrouped.apply(Values.<Iterable<MyType>>create()
번째 방법
여기서 추천 따랐다 How do I make View's asList() sortable in Google Dataflow SDK?하지만 정렬없이. View.asList()를 만들고 더미 PCollection을 만든 다음 뷰와 함께 사이드 PCollection에 ParDo 함수를 적용하여 뷰를 반환했습니다.PCollection<MyType> myData;
// Create view of the PCollection as a list
PCollectionView<List<MyType>> myDataView = myData.apply(View.asList());
// Create dummy PCollection
PCollection<Integer> dummy = pipeline.apply(Create.<Integer>of(1));
// Apply dummy ParDo that returns the view
PCollection<List<MyType>> myDataList = dummy.apply(
ParDo.withSideInputs(myDataView).of(new DoFn<Integer, List<MyType>>() {
@Override
public void processElement(ProcessContext c) {
c.output(c.sideInput(myDataView));
}
}));
이 작업에는 미리 정의 된 결합 기능이있는 것처럼 보이지만 찾을 수 없습니다. 도와 주셔서 감사합니다!