2017-02-01 3 views
2

을 통해 데이터 흐름과 클라우드 스토리지에 중첩되지 JSON 파일을 읽을 수 있습니다 : 난 그냥 내가 할 수있는 BigQuery에 최소한의 필터링 이러한 로그를 작성하려면읽기 구글 데이터 흐름 /에서 JSON을 중첩 아파치 빔

p.apply("read logfiles", TextIO.Read.from("gs://bucket/*").withCoder(TableRowJsonCoder.of())); 

그래서이 같은 DoFn을 사용하여 :

private static class Formatter extends DoFn<TableRow,TableRow> { 

     @Override 
     public void processElement(ProcessContext c) throws Exception { 

      // .clone() since input is immutable 
      TableRow output = c.element().clone(); 

      // remove misleading timestamp field 
      output.remove("@timestamp"); 

      // set timestamp field by using the element's timestamp 
      output.set("timestamp", c.timestamp().toString()); 

      c.output(output); 
     } 
    } 
} 

그러나 JSON 파일에서 중첩 된 필드에이 방법으로 액세스하는 방법을 알지 못합니다.

  1. TableRow 포함하면 RECORDr라는 그것이 상기 직렬화/역 직렬화없이 키/값에 액세스 할 수있다? 내가 직렬화해야하는 경우
  2. /그것이 내가이 길을 잃어 성능 뒷면의 일부를 얻기 위해하는 표준 Coder 대신 TableRowJsonCoderTextIO.Read의를 사용하는 것이 더 의미가 않습니다는 Jackson 라이브러리와 자신을 역 직렬화?

편집

파일이 구분 된 새로운 라인이며, 다음과 같이 보일 :

{"@timestamp":"2015-x", "message":"bla", "r":{"analyzed":"blub", "query": {"where":"9999"}}} 
{"@timestamp":"2015-x", "message":"blub", "r":{"analyzed":"bla", "query": {"where":"1111"}}} 
+0

파일 형식은 어떻게됩니까? 그것들은 개행 문자로 구분 되나요, 아니면 중첩 된 레코드 중 하나에 개행 문자가 나타날 수 있습니까? –

+0

파일은 개행 문자로 구분되며 중첩 된 레코드 중 하나에서 개행을 기대하지 않습니다. 내 질문에 예를 들어 편집했습니다. – Tobi

답변

4

가장 좋은 방법은 잭슨이 # 2에 설명 된 일을하고 사용하는 아마 직접. TextIO는 문자열 코더를 사용하여 파일에서 행을 읽은 다음, 요소를 실제로 구문 분석하기 위해 DoFn을 사용하여 TextIO가 읽은 내용을 읽도록하는 것이 가장 좋습니다. 다음과 같은 것 :

PCollection<String> lines = pipeline 
    .apply(TextIO.from("gs://bucket/...")); 
PCollection<TableRow> objects = lines 
    .apply(ParDo.of(new DoFn<String, TableRow>() { 
    @Override 
    public void processElement(ProcessContext c) { 
     String json = c.element(); 
     SomeObject object = /* parse json using Jackson, etc. */; 
     TableRow row = /* create a table row from object */; 
     c.output(row); 
    } 
    }); 

여러 개의 ParDos를 사용하여이 작업을 수행 할 수 있습니다.

+0

결국 여러 개의 ParDos를 사용하여 해결했습니다. 감사합니다. – Tobi

관련 문제