샘플 JSON에서 JavaRDD
{ "이름": "DEV", "급여"10000, "직업": "ENGG ","주소 ":"노 "} {"이름 ":"KARTHIK ","급여 "20000,"직업 ":"ENGG ","주소 ":"노 "}
유용한 코드 :
final List<Map<String,String>> jsonData = new ArrayList<>();
DataFrame df = sqlContext.read().json("file:///home/dev/data-json/emp.json");
JavaRDD<String> rdd = df.repartition(1).toJSON().toJavaRDD();
rdd.foreach(new VoidFunction<String>() {
@Override
public void call(String line) {
try {
jsonData.add (new ObjectMapper().readValue(line, Map.class));
System.out.println(Thread.currentThread().getName());
System.out.println("List size: "+jsonData.size());
} catch (IOException e) {
e.printStackTrace();
}
}
});
System.out.println(Thread.currentThread().getName());
System.out.println("List size: "+jsonData.size());
jsonData
은 결국 비어 있습니다.
출력 : (내가 테스트 한이 이것이 당신의 JSON에없는 곳의 경우를 처리하기 때문에이 값 부분 문자열이 아닌 매핑 Map<String, Object>
을 선호 https://github.com/freedev/spark-test
final ObjectMapper objectMapper = new ObjectMapper();
List<Map<String, Object>> list = rdd
.map(new org.apache.spark.api.java.function.Function<String, Map<String, Object>>() {
@Override
public Map<String, Object> call(String line) throws Exception {
TypeReference<Map<String, Object>> typeRef = new TypeReference<Map<String, Object>>() {
};
Map<String, Object> rs = objectMapper.readValue(line, typeRef);
return rs;
}
}).collect();
작동
Executor task launch worker-1
List size: 1
Executor task launch worker-1
List size: 2
Executor task launch worker-1
List size: 3
.
.
.
Executor task launch worker-1
List size: 100
main
List size: 0
목록이 처음부터 비어있는 것처럼 보이므로 객체 매퍼가 가져올 수있는 행을 구문 분석 할 수 없습니까? [mcve]를 제공 할 수 있습니까? – Thomas
'rdd '란 무엇입니까? – khelwood
아마도 foreach가 작업을 완료하기 전에 (또는 시작된 것처럼)'System.out.println'이 실행될 수 있습니까? – freedev