Cloud Pub/Sub에서 데이터를 읽고 Cloud Dataflow로 BigQuery에 쓰기를 원합니다. 각 데이터에는 데이터 자체가 저장 될 테이블 ID가 들어 있습니다.BigQueryIO.With가 던진 예외를 잡아 내고 출력에 실패한 데이터를 구하는 방법은 무엇입니까?
- 테이블 ID 형식이 잘못 :
는 BigQuery를 쓰기에 실패하는 것이 여러 요인이 있습니다.
- 데이터 집합이 존재하지 않습니다.
- 데이터 집합에서 파이프 라인에 액세스 할 수 없습니다.
- 네트워크 오류입니다.
오류 중 하나가 발생하면 스트리밍 작업이 작업을 다시 시도하고 정지합니다. 불량 데이터를 구제하고 실속을 피하기 위해 WriteResult.getFailedInserts()
을 사용해 보았지만 제대로 작동하지 않았습니다. 어떤 좋은 방법이 있습니까? 파이프 라인의 정의에 출력에 기록 할 때 예외를 포착하는 쉬운 방법이 없습니다
public class StarterPipeline {
private static final Logger LOG = LoggerFactory.getLogger(StarterPipeline.class);
public class MyData implements Serializable {
String table_id;
}
public interface MyOptions extends PipelineOptions {
@Description("PubSub topic to read from, specified as projects/<project_id>/topics/<topic_id>")
@Validation.Required
ValueProvider<String> getInputTopic();
void setInputTopic(ValueProvider<String> value);
}
public static void main(String[] args) {
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);
Pipeline p = Pipeline.create(options);
PCollection<MyData> input = p
.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getInputTopic()))
.apply("ParseJSON", MapElements.into(TypeDescriptor.of(MyData.class))
.via((String text) -> new Gson().fromJson(text, MyData.class)));
WriteResult writeResult = input
.apply("WriteToBigQuery", BigQueryIO.<MyData>write()
.to(new SerializableFunction<ValueInSingleWindow<MyData>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<MyData> input) {
MyData myData = input.getValue();
return new TableDestination(myData.table_id, null);
}
})
.withSchema(new TableSchema().setFields(new ArrayList<TableFieldSchema>() {{
add(new TableFieldSchema().setName("table_id").setType("STRING"));
}}))
.withFormatFunction(new SerializableFunction<MyData, TableRow>() {
@Override
public TableRow apply(MyData myData) {
return new TableRow().set("table_id", myData.table_id);
}
})
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withFailedInsertRetryPolicy(InsertRetryPolicy.neverRetry()));
writeResult.getFailedInserts()
.apply("LogFailedData", ParDo.of(new DoFn<TableRow, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = c.element();
LOG.info(row.get("table_id").toString());
}
}));
p.run();
}
}