2017-12-28 5 views
0

Cloud Pub/Sub에서 데이터를 읽고 Cloud Dataflow로 BigQuery에 쓰기를 원합니다. 각 데이터에는 데이터 자체가 저장 될 테이블 ID가 들어 있습니다.BigQueryIO.With가 던진 예외를 잡아 내고 출력에 실패한 데이터를 구하는 방법은 무엇입니까?

  • 테이블 ID 형식이 잘못 :

    는 BigQuery를 쓰기에 실패하는 것이 여러 요인이 있습니다.

  • 데이터 집합이 존재하지 않습니다.
  • 데이터 집합에서 파이프 라인에 액세스 할 수 없습니다.
  • 네트워크 오류입니다.

오류 중 하나가 발생하면 스트리밍 작업이 작업을 다시 시도하고 정지합니다. 불량 데이터를 구제하고 실속을 피하기 위해 WriteResult.getFailedInserts()을 사용해 보았지만 제대로 작동하지 않았습니다. 어떤 좋은 방법이 있습니까? 파이프 라인의 정의에 출력에 기록 할 때 예외를 포착하는 쉬운 방법이 없습니다

public class StarterPipeline { 
    private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class); 

    public class MyData implements Serializable { 
    String table_id; 
    } 

    public interface MyOptions extends PipelineOptions { 
    @Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>") 
    @Validation.Required 
    ValueProvider<String> getInputTopic(); 
    void setInputTopic(ValueProvider<String> value); 
    } 

    public static void main(String[] args) { 
    MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class); 

    Pipeline p = Pipeline.create(options); 

    PCollection<MyData> input = p 
     .apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getInputTopic())) 
     .apply("ParseJSON", MapElements.into(TypeDescriptor.of(MyData.class)) 
      .via((String text) -> new Gson().fromJson(text, MyData.class))); 
    WriteResult writeResult = input 
     .apply("WriteToBigQuery", BigQueryIO.<MyData>write() 
      .to(new SerializableFunction<ValueInSingleWindow<MyData>, TableDestination>() { 
       @Override 
       public TableDestination apply(ValueInSingleWindow<MyData> input) { 
       MyData myData = input.getValue(); 
       return new TableDestination(myData.table_id, null); 
       } 
      }) 
      .withSchema(new TableSchema().setFields(new ArrayList<TableFieldSchema>() {{ 
       add(new TableFieldSchema().setName("table_id").setType("STRING")); 
      }})) 
      .withFormatFunction(new SerializableFunction<MyData, TableRow>() { 
       @Override 
       public TableRow apply(MyData myData) { 
       return new TableRow().set("table_id", myData.table_id); 
       } 
      }) 
      .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED) 
      .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND) 
      .withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry())); 
    writeResult.getFailedInserts() 
     .apply("LogFailedData", ParDo.of(new DoFn<TableRow, TableRow>() { 
      @ProcessElement 
      public void processElement(ProcessContext c) { 
      TableRow row = c.element(); 
      LOG.info(row.get("table_id").toString()); 
      } 
     })); 

    p.run(); 
    } 
} 

답변

1

:

여기 내 코드입니다. BigQuery에 대한 사용자 정의 PTransform을 작성하여이 작업을 수행 할 수 있다고 가정합니다. 그러나 Apache Beam에서 네이티브 방식으로 수행 할 수있는 방법은 없습니다. 클라우드 데이터 흐름의 자동 재시도 기능이 손상 될 수 있으므로이 문제를 권장합니다.

코드 예에서 실패한 삽입 재 시도 정책을 다시 시도하지 않도록 설정했습니다. 항상 다시 시도하도록 정책을 설정할 수 있습니다. 간헐적 인 네트워크 오류 (네 번째 글 머리 기호)와 같은 경우에만 유효합니다.

.withFailedInsertRetryPolicy(InsertRetryPolicy.alwaysRetry()) 

테이블 ID 형식이 잘못된 경우 (1 글 머리), 다음 CREATE_IF_NEEDED은 처분 구성이 테이블 ID가 잘못된 경우에도 데이터 흐름 작업이 자동으로 오류없이 새 테이블을 만들 수 있도록해야 만들 .

데이터 세트에 액세스 권한 문제가 있거나 데이터 세트 (두 번째 및 세 번째 글 머리글)에 액세스 권한 문제가있는 경우 스트리밍 작업이 중단되어 궁극적으로 실패합니다. 수동 개입 없이는 어떤 상황에서도 진행할 수 없습니다.

관련 문제