0

저장소에 CSV 파일이 있으며이를 읽고 BigQuery 테이블에 기록하고 싶습니다. 이 내 코드입니다GCP 데이터 흐름 - 저장소의 CSV 파일을 읽고 BigQuery에 쓰기

GroupName,Groupcode,GroupOwner,GroupCategoryID 
System Administrators,sysadmin,13456,100 
Independence High Teachers,HS Teachers,,101 
John Glenn Middle Teachers,MS Teachers,13458,102 
Liberty Elementary Teachers,Elem Teachers,13559,103 
1st Grade Teachers,1stgrade,,104 
2nd Grade Teachers,2nsgrade,13561,105 
3rd Grade Teachers,3rdgrade,13562,106 
Guidance Department,guidance,,107 
Independence Math Teachers,HS Math,13660,108 
Independence English Teachers,HS English,13661,109 
John Glenn 8th Grade Teachers,8thgrade,,110 
John Glenn 7th Grade Teachers,7thgrade,13452,111 
Elementary Parents,Elem Parents,,112 
Middle School Parents,MS Parents,18001,113 
High School Parents,HS Parents,18002,114 

: 작업이 실행을 시작하면, 나는이 참조 1) :

public class StorgeBq { 

     public static class StringToRowConverter extends DoFn<String, TableRow> { 

      private String[] columnNames; 

      private boolean isFirstRow = true; 

      @ProcessElement 
      public void processElement(ProcessContext c) { 
       TableRow row = new TableRow(); 

       String[] parts = c.element().split(","); 

       if (isFirstRow) { 
        columnNames = Arrays.copyOf(parts, parts.length); 
        isFirstRow = false; 
       } else { 
        for (int i = 0; i < parts.length; i++) { 
         row.set(columnNames[i], parts[i]); 
        } 
        c.output(row); 
       } 
      } 
     } 

     public static void main(String[] args) { 

      DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() 
         .as(DataflowPipelineOptions.class); 
        options.setZone("europe-west1-c"); 
        options.setProject("mydata-dev"); 
        options.setRunner(DataflowRunner.class); 
        Pipeline p = Pipeline.create(options); 

      p.apply("ReadLines", TextIO.read().from("gs://mydata3-dataflow/C2ImportGroupsSample.csv")) 
      .apply("ConverToBqRow",ParDo.of(new StringToRowConverter())) 
      .apply("WriteToBq", BigQueryIO.<TableRow>writeTableRows() 
        .to("mydata-dev:DF_TEST.dataflow_table") 
        .withWriteDisposition(WriteDisposition.WRITE_APPEND) 
        .withCreateDisposition(CreateDisposition.CREATE_NEVER)); 
      p.run().waitUntilFinish(); 
     } 

} 

몇 가지 문제가 있습니다 첫 번째 줄은 헤더이고이 내 CSV 파일입니다 내 코드에서 정의하지 않은 "DropInputs"라는 프로세스! 그리고 모든 작업 전에 실행을 시작, 왜 ?? enter image description here

2) pipline이 첫 번째 작업 "ReadLines"로 시작하지 않는 이유는 무엇입니까? 3) 로그 파일에서 "WriteToBq"작업에서 데이터 중 하나를 필드로 찾으려고합니다. 예를 들어 "1st Grade Teachers"는 필드가 아니라 "GroupName"의 데이터입니다 :

"message" : "JSON parsing error in row starting at position 0: No such field: 1st Grade Teachers.", 
+0

직장 ID가 있습니까? 나는 DropInputs가 여기에 나타나야한다고 생각하지 않는다. – jkff

답변

1

코드에 몇 가지 문제점이 있습니다. 그러나 우선, "DropInputs"단계와 관련하여 안전하게 무시할 수 있습니다. 버그 보고서 this의 결과입니다. 나는 왜 그것이 표시되어야하는지 이해하지 못합니다 (많은 사용자도 혼란 스러울 것입니다). Google 직원이 그 일에 의견을 같이하고 싶습니다. 내 의견으로는 그것은 혼란 스럽다.

오른쪽, 지금 코드 :

  1. 당신은 첫 번째 행의 읽기 헤더 될 것이라고 가정한다. 이것은 잘못된 가정입니다. Dataflow는 병렬로 읽으므로 헤더 행은 언제든지 도착할 수 있습니다. boolean 플래그를 사용하여 확인하는 대신 ParDo에서 매번 string 값 자체를 확인하십시오. if (c.element.contains("GroupName") then..
  2. BigQuery 테이블 스키마가 누락되었습니다. BigQuery 싱크에 withSchema(..)을 추가해야합니다. 내 공개 파이프 라인 중 하나의 example입니다.
+0

고마워,하지만 기존 테이블에 BigQuery로 쓰고 싶다면 "withSchema (..)"를 추가하지 않고 어떻게 써야할까요? 왜냐하면 이것은 CSV 예제입니다,하지만 난 그들 각각 500 collumns 있고 항상 스키마를 추가하는 것은 쉽지 않은 여러 CSV 파일에 대해이 작업을 수행해야합니다. CSV를 읽고 BigQuery에 writting하는 예가 있습니까? – Majico

+0

CSV 파일을 BQ에 쓰는 데 성공했지만 유일한 문제는 "withSchema"를 사용하지 않고 BQ에 쓰거나 코드를 작성하는 대신 BQ에서 가져 오는 쉬운 방법입니다. – Majico

+1

이에 대해 별도의 질문을하십시오. – jkff

관련 문제