저장소에 CSV 파일이 있으며이를 읽고 BigQuery 테이블에 기록하고 싶습니다. 이 내 코드입니다GCP 데이터 흐름 - 저장소의 CSV 파일을 읽고 BigQuery에 쓰기
GroupName,Groupcode,GroupOwner,GroupCategoryID
System Administrators,sysadmin,13456,100
Independence High Teachers,HS Teachers,,101
John Glenn Middle Teachers,MS Teachers,13458,102
Liberty Elementary Teachers,Elem Teachers,13559,103
1st Grade Teachers,1stgrade,,104
2nd Grade Teachers,2nsgrade,13561,105
3rd Grade Teachers,3rdgrade,13562,106
Guidance Department,guidance,,107
Independence Math Teachers,HS Math,13660,108
Independence English Teachers,HS English,13661,109
John Glenn 8th Grade Teachers,8thgrade,,110
John Glenn 7th Grade Teachers,7thgrade,13452,111
Elementary Parents,Elem Parents,,112
Middle School Parents,MS Parents,18001,113
High School Parents,HS Parents,18002,114
: 작업이 실행을 시작하면, 나는이 참조 1) :
public class StorgeBq {
public static class StringToRowConverter extends DoFn<String, TableRow> {
private String[] columnNames;
private boolean isFirstRow = true;
@ProcessElement
public void processElement(ProcessContext c) {
TableRow row = new TableRow();
String[] parts = c.element().split(",");
if (isFirstRow) {
columnNames = Arrays.copyOf(parts, parts.length);
isFirstRow = false;
} else {
for (int i = 0; i < parts.length; i++) {
row.set(columnNames[i], parts[i]);
}
c.output(row);
}
}
}
public static void main(String[] args) {
DataflowPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
.as(DataflowPipelineOptions.class);
options.setZone("europe-west1-c");
options.setProject("mydata-dev");
options.setRunner(DataflowRunner.class);
Pipeline p = Pipeline.create(options);
p.apply("ReadLines", TextIO.read().from("gs://mydata3-dataflow/C2ImportGroupsSample.csv"))
.apply("ConverToBqRow",ParDo.of(new StringToRowConverter()))
.apply("WriteToBq", BigQueryIO.<TableRow>writeTableRows()
.to("mydata-dev:DF_TEST.dataflow_table")
.withWriteDisposition(WriteDisposition.WRITE_APPEND)
.withCreateDisposition(CreateDisposition.CREATE_NEVER));
p.run().waitUntilFinish();
}
}
몇 가지 문제가 있습니다 첫 번째 줄은 헤더이고이 내 CSV 파일입니다 내 코드에서 정의하지 않은 "DropInputs"라는 프로세스! 그리고 모든 작업 전에 실행을 시작, 왜 ??
2) pipline이 첫 번째 작업 "ReadLines"로 시작하지 않는 이유는 무엇입니까? 3) 로그 파일에서 "WriteToBq"작업에서 데이터 중 하나를 필드로 찾으려고합니다. 예를 들어 "1st Grade Teachers"는 필드가 아니라 "GroupName"의 데이터입니다 :
"message" : "JSON parsing error in row starting at position 0: No such field: 1st Grade Teachers.",
직장 ID가 있습니까? 나는 DropInputs가 여기에 나타나야한다고 생각하지 않는다. – jkff