2016-11-02 3 views
1

파일을 압축 해제하기위한 맞춤 싱크를 작성하려고합니다.Google Dataflow : java.lang.IllegalArgumentException : setCoder (null) 수 없습니다.

public static class ZipIO{  
    public static class Sink extends com.google.cloud.dataflow.sdk.io.Sink<String> { 

    private static final long serialVersionUID = -7414200726778377175L; 
    private final String unzipTarget; 

     public Sink withDestinationPath(String s){ 
     if(s!=""){ 
      return new Sink(s); 
     } 
     else { 
      throw new IllegalArgumentException("must assign destination path"); 
     } 

     } 

     protected Sink(String path){ 
      this.unzipTarget = path; 
     } 

     @Override 
     public void validate(PipelineOptions po){ 
      if(unzipTarget==null){ 
       throw new RuntimeException(); 
      } 
     } 

     @Override 
     public ZipFileWriteOperation createWriteOperation(PipelineOptions po){ 
      return new ZipFileWriteOperation(this); 
     } 

    } 

    private static class ZipFileWriteOperation extends WriteOperation<String, UnzipResult>{ 

    private static final long serialVersionUID = 7976541367499831605L; 
    private final ZipIO.Sink sink; 

     public ZipFileWriteOperation(ZipIO.Sink sink){ 
      this.sink = sink; 
     } 



     @Override 
     public void initialize(PipelineOptions po) throws Exception{ 

     } 

     @Override 
     public void finalize(Iterable<UnzipResult> writerResults, PipelineOptions po) throws Exception { 
     long totalFiles = 0; 
     for(UnzipResult r:writerResults){ 
      totalFiles +=r.filesUnziped; 
     } 
     LOG.info("Unzipped {} Files",totalFiles); 
     } 

     @Override 
     public ZipIO.Sink getSink(){ 
      return sink; 
     } 

     @Override 
     public ZipWriter createWriter(PipelineOptions po) throws Exception{ 
      return new ZipWriter(this); 
     } 

    } 

    private static class ZipWriter extends Writer<String, UnzipResult>{ 
     private final ZipFileWriteOperation writeOp; 
     public long totalUnzipped = 0; 

     ZipWriter(ZipFileWriteOperation writeOp){ 
      this.writeOp = writeOp; 
     } 

     @Override 
     public void open(String uID) throws Exception{ 
     } 

     @Override 
     public void write(String p){ 
      System.out.println(p); 
     } 

     @Override 
     public UnzipResult close() throws Exception{ 
      return new UnzipResult(this.totalUnzipped); 
     } 

     @Override 
     public ZipFileWriteOperation getWriteOperation(){ 
      return writeOp; 
     } 


    } 

    private static class UnzipResult implements Serializable{ 
    private static final long serialVersionUID = -8504626439217544799L; 
    public long filesUnziped=0;  
     public UnzipResult(long filesUnziped){ 
      this.filesUnziped=filesUnziped; 
     } 
    } 
} 

}

처리가 오류로 실패 : 어떤 도움이 이해된다

Exception in thread "main" java.lang.IllegalArgumentException: Cannot setCoder(null) at com.google.cloud.dataflow.sdk.values.TypedPValue.setCoder(TypedPValue.java:67) at com.google.cloud.dataflow.sdk.values.PCollection.setCoder(PCollection.java:150) at com.google.cloud.dataflow.sdk.io.Write$Bound.createWrite(Write.java:380) at com.google.cloud.dataflow.sdk.io.Write$Bound.apply(Write.java:112) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$BatchWrite.apply(DataflowPipelineRunner.java:2118) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner$BatchWrite.apply(DataflowPipelineRunner.java:2099) at com.google.cloud.dataflow.sdk.runners.PipelineRunner.apply(PipelineRunner.java:75) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:465) at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.apply(BlockingDataflowPipelineRunner.java:169) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:368) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:275) at com.google.cloud.dataflow.sdk.runners.DataflowPipelineRunner.apply(DataflowPipelineRunner.java:463) at com.google.cloud.dataflow.sdk.runners.BlockingDataflowPipelineRunner.apply(BlockingDataflowPipelineRunner.java:169) at com.google.cloud.dataflow.sdk.Pipeline.applyInternal(Pipeline.java:368) at com.google.cloud.dataflow.sdk.Pipeline.applyTransform(Pipeline.java:291) at com.google.cloud.dataflow.sdk.values.PCollection.apply(PCollection.java:174) at com.mcd.de.tlogdataflow.StarterPipeline.main(StarterPipeline.java:93)

간단한 코드를 갖는.

감사 필립

답변

0

이 충돌은 또한 아파치 빔 (배양) 자바 SDK에 존재했던 데이터 흐름 자바 SDK (specifically, this line) 버그로 인해 발생 & BR.

Sink.WriterOperation#getWriterResultCoder() 메서드는 항상 오버라이드해야하지만, abstract으로 표시하지 못했습니다. Beam에서는 고정되어 있지만 Dataflow SDK에서는 변경되지 않았습니다. 이 메서드를 재정 의하여 적절한 코더를 반환해야합니다. 당신이 사용할 수 있도록

  • 그냥 대신 UnzipResult 구조의 long를 사용 VarLongCoder 중 하나 BigEndianLongCoder 포장, 자신의 작은 코더 클래스를 작성

    1. 을 :

      당신은 코더 마련하기 위해 몇 가지 옵션이 있습니다 그것들은 그대로이다.

    2. 적은 것이 좋습니다 인해 초과 크기, 당신은 SerializableCoder.of(UnzipResult.class)
  • +0

    안녕 켄, 당신의 도움에 대한 감사를 사용할 수 있습니다. close() 작업을 long으로 반환하여 Long 구조로 변경했습니다. 여전히, 나는 setCoder (null) Exception과 같을 수 없다. – bigdataclown

    +0

    안녕하세요, Kenn, 해결 방법을 찾으셨습니까? 감사 & BR Philipp – bigdataclown

    +0

    'getWriterResultCoder()'를 오버라이드 했습니까? 그것이 당신이해야 할 일입니다. –

    관련 문제