2016-06-14 14 views
2

새로운 Google의 데이터 흐름 버전 1.6으로 업그레이드했으며 로컬 컴퓨터에서 테스트 할 때 내 파이프 라인 끝에 java.lang.IllegalStateException이 발생합니다. 버전 1.5.1에서는이 문제가 발생하지 않았습니다.Google 데이터 흐름에서 java.lang.IllegalStateException이 발생하는 이유는 무엇입니까?

로컬에서만 실제 환경에서는 발생하지 않습니다. 새로운 버전의 버그입니까? 오류가 발생하지 않도록 코드를 변경해야합니까?

문제점을 찾기 위해 내 파이프 라인 부분을 연결했습니다.

private static void getTableRowAndWrite(final PCollection<KV<Integer, Iterable<byte[]>>> groupedTransactions, final String tableName) { 
    // Get the tableRow element from the PCollection 
    groupedTransactions 
      .apply(ParDo 
        .of(((tableName.equals("avail")) ? new GetTableRowAvail() : new GetTableRowReservation())) //Get a TableRow 
        .named("Get " + tableName + " TableRows")) 
      .apply(BigQueryIO 
        .Write 
        .named("Write to BigQuery " + tableName) //Write to BigQuery 
        .withSchema(createTableSchema()) 
        .to((SerializableFunction<BoundedWindow, String>) window -> { 
         String date = window.toString(); 
         String date2 = date.substring(1, 5) + date.substring(6, 8) + date.substring(9, 11); 
         return "travelinsights-1056:hotel." + tableName + "_full_" + (TEST ? "test_" : "") + date2; 
        }) 
        .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
      ); 
} 

오류 :

Exception in thread "main" java.lang.IllegalStateException: Cleanup time 294293-06-23T12:00:54.774Z is beyond end-of-time 
at com.google.cloud.dataflow.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:199) 
at com.google.cloud.dataflow.sdk.util.ReduceFnRunner.onTimer(ReduceFnRunner.java:642) 
at com.google.cloud.dataflow.sdk.util.BatchTimerInternals.advance(BatchTimerInternals.java:134) 
at com.google.cloud.dataflow.sdk.util.BatchTimerInternals.advanceInputWatermark(BatchTimerInternals.java:110) 
at com.google.cloud.dataflow.sdk.util.GroupAlsoByWindowsViaOutputBufferDoFn.processElement(GroupAlsoByWindowsViaOutputBufferDoFn.java:91) 
at com.google.cloud.dataflow.sdk.util.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:49) 
at com.google.cloud.dataflow.sdk.util.DoFnRunnerBase.processElement(DoFnRunnerBase.java:138) 
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateHelper(ParDo.java:1229) 
at com.google.cloud.dataflow.sdk.transforms.ParDo.evaluateSingleHelper(ParDo.java:1098) 
at com.google.cloud.dataflow.sdk.transforms.ParDo.access$300(ParDo.java:457) 
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1084) 
at com.google.cloud.dataflow.sdk.transforms.ParDo$1.evaluate(ParDo.java:1079) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.visitTransform(DirectPipelineRunner.java:858) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:219) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformTreeNode.visit(TransformTreeNode.java:215) 
at com.google.cloud.dataflow.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:102) 
at com.google.cloud.dataflow.sdk.Pipeline.traverseTopologically(Pipeline.java:259) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner$Evaluator.run(DirectPipelineRunner.java:814) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:526) 
at com.google.cloud.dataflow.sdk.runners.DirectPipelineRunner.run(DirectPipelineRunner.java:96) 
at com.google.cloud.dataflow.sdk.Pipeline.run(Pipeline.java:180) 
+1

나는이 문제가 파이프 라인의 다른 곳에 있다고 믿는다. 스택 추적은 창 끝에 허용되는 지연 시간을 더한 값이 데이터 흐름에서 허용되는 최대 시간 소인을 초과하도록하는 창이 있음을 의미합니다. 타임 스탬프를 요소에 넣고 창에 넣는 파이프 라인 부분을 기꺼이 공유 할 의향이 있습니까? –

답변

3

당신은 버그를 발견!

이 내용은 BEAM-341으로 제출되었으며 수정 된 내용은 검토 후 즉시 Dataflow Java SDK에 이식 될 #464으로 검토 중입니다.

창 작업, 트리거링 및 허용 된 지연을 설정하는 코드를 보지 않고도 이것이 어떻게 영향을 미치는지 확신 할 수 없습니다. 그러나 비 전역 창 및 허용 된 대기 시간이 매우 길면 창을 "끝날 때까지"만료시키지 않는 간단한 해결 방법이 있습니다. 이 경우 효과적으로 무한 대신에 매우 큰 (예 : 수백 년) 허용 된 대기 시간으로 작업을 업데이트 할 수 있습니다.

관련 문제