2017-12-27 2 views
0

기존 파이프 라인을 데이터 흐름 2.x로 마이그레이션 중입니다. 파이프 라인의 마지막 단계에서 데이터가 Google 클라우드 서비스에 기록됩니다. 데이터를 .gz로 압축해야합니다. 이전에는 (데이터 흐름 1.x 구현에서) 우리는 우리를 위해이 작업을 위해 자체 Sink를 작성했습니다. 데이터 흐름 2.x에는이 작업을 수행 할 수있는 기본 방법이 있습니다. 나는 올바른 코드가 있어야하지만, 자바 컴파일러는 잘못된 유형을 반환하는 TextIO.write()에 대해 불평하고 있습니다. 코드는 다음과 같습니다 :Dataflow 2.x가 PCollectionTuple.apply() 호출시 잘못된 매개 변수 유형에 대해 불평 함

PCollectionTuple results = /* some transforms */ 

// write main result 
results.get(mainOutputTag). 
apply("WriteProfile", TextIO.write().to(outputBucket) 
.withSuffix(".json")   
.withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP) 
.withNumShards(numChunks)); 

자바 컴파일러는이 오류와 함께, 불평 :

The method apply(String, PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (String, TextIO.Write) 

사람은 문제가 위의 내 코드가 무엇인지 볼 수 있습니까? 더 많은 상황이 필요하면 알려주십시오.

답변

0

문제가 해결되었습니다. 문제는 파일에 PCollection<TableRow>을 쓰려고 시도한 것이므로 PCollection<String> 만 파일에 쓸 수 있습니다.

PCollectionTuple results = /* some transforms */ 

// write main result 
results.get(mainOutputTag) /* PCollection<TableRow> */ 

    .apply(ParDo.of(new DoFn<TableRow, String>() { 
     @ProcessElement 
     public void processElement(ProcessContext c) { 
      c.output(c.element().toString()); 
     } 
    })) /* PCollection<String> */ 

    .apply("WriteProfile", TextIO.write().to(outputBucket) 
    .withSuffix(".json")   
    .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP) 
    .withNumShards(numChunks)); 
:

내 최종 솔루션이었다
관련 문제