2017-04-24 1 views
2

샘플 JSON에서 JavaRDD .foreach ( 총 100 개 개의 레코드) 후 : ArrayList를이 비어 불꽃

{ "이름": "DEV", "급여"10000, "직업": "ENGG ","주소 ":"노 "} {"이름 ":"KARTHIK ","급여 "20000,"직업 ":"ENGG ","주소 ":"노 "}

유용한 코드 :

final List<Map<String,String>> jsonData = new ArrayList<>(); 

    DataFrame df = sqlContext.read().json("file:///home/dev/data-json/emp.json"); 
    JavaRDD<String> rdd = df.repartition(1).toJSON().toJavaRDD(); 

    rdd.foreach(new VoidFunction<String>() { 
     @Override 
     public void call(String line) { 
      try { 
       jsonData.add (new ObjectMapper().readValue(line, Map.class)); 
       System.out.println(Thread.currentThread().getName()); 
       System.out.println("List size: "+jsonData.size()); 
      } catch (IOException e) { 
       e.printStackTrace(); 
      } 
     } 
    }); 

    System.out.println(Thread.currentThread().getName()); 
    System.out.println("List size: "+jsonData.size()); 

jsonData은 결국 비어 있습니다.

출력 : (내가 테스트 한이 이것이 당신의 JSON에없는 곳의 경우를 처리하기 때문에이 값 부분 문자열이 아닌 매핑 ​​Map<String, Object>을 선호 https://github.com/freedev/spark-test

final ObjectMapper objectMapper = new ObjectMapper(); 

List<Map<String, Object>> list = rdd 
     .map(new org.apache.spark.api.java.function.Function<String, Map<String, Object>>() { 
      @Override 
      public Map<String, Object> call(String line) throws Exception { 
       TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>() { 
       }; 
       Map<String, Object> rs = objectMapper.readValue(line, typeRef); 
       return rs; 
      } 
     }).collect(); 

작동

Executor task launch worker-1 
List size: 1 
Executor task launch worker-1 
List size: 2 
Executor task launch worker-1 
List size: 3 
. 
. 
. 
Executor task launch worker-1 
List size: 100 

main 
List size: 0 
+1

목록이 처음부터 비어있는 것처럼 보이므로 객체 매퍼가 가져올 수있는 행을 구문 분석 할 수 없습니까? [mcve]를 제공 할 수 있습니까? – Thomas

+1

'rdd '란 무엇입니까? – khelwood

+2

아마도 foreach가 작업을 완료하기 전에 (또는 시작된 것처럼)'System.out.println'이 실행될 수 있습니까? – freedev

답변

1

"salary":20000).

+1

질문에 'java-7' 태그가 붙어 있습니다. Java 8 코드는 도움이되지 않습니다. – khelwood

+0

@khelwood 감사합니다. – freedev

+0

@freedev 노력에 감사드립니다. 나는 그것을 시도했지만 예외를 메인 "org.apache.spark.SparkException : 직렬화 할 수 없다"(org.apache.spark.util.ClosureCleaner $ .ensureSerializable (ClosureCleaner.scala : 304)를 추가 한 후에도 \t의 작업을 직렬화 할 수 없음) 'main 함수를 구현하는 메인 클래스에'Serializable'을 구현했습니다. –