나는 스파크 엔진에서 스파크 스트리밍을 배우기 시작했으며 데이터 분석과 스파크를 처음 접했다. 나는 미래의 데이터를 예측하기 위해 작은 IOT 애플리케이션을 만들고 싶다.자바 스파크 스트리밍 JSON 파싱
는 I 데이터가 배치되는 유닉스 타임 스탬프 t이 다음과 같이, 실시간 센서 JSON 데이터를 전송 TIVA 하드웨어
[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}]
이다있다. 센서는 각 센서 ('s') 데이터가 'd'인 센서 배열입니다.
내가 원하는 것은이 데이터를 소비하고 spark-streaming하는 객체를 만든 다음 spark의 Mlib (기계 학습) 또는 동급 라이브러리를 통해 모든 데이터를 전달하여 향후 데이터를 예측합니다.
나는 이것이 내가 사용하기로 결정 모든 기술 선택- 와 가능 여부를 일반적인 생각을 줄까?
- 중첩 된 JSON을 어떻게 소비 할 수 있습니까? 나는 SQLContext를 사용해 보았지만 성공하지 못했다.
- 여기에서 내가하려고하는 것을 달성하기위한 일반적인 지침.
다음은 KAFKA의 메시지를 사용하는 코드입니다.
SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
// TODO: processing pipeline
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("metadata.broker.list", "kafkaserver_address:9092");
Set<String> topics = Collections.singleton("RAH");
JavaPairInputDStream<String, String> directKafkaStream =
KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class,
StringDecoder.class, kafkaParams, topics);
JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() {
public String call(Tuple2<String,String> message) throws Exception {
System.out.println(message._2());
return message._2();
};
});
System.out.println(" json is 0------ 0"+ json);
json.foreachRDD(rdd -> {
rdd.foreach(
record -> System.out.println(record));
});
ssc.start();
ssc.awaitTermination();
추신 : 나는 선형성과 성능을 유지하기 위해 Java에서 이것을하고 싶습니다. 질문에 대한
당신은 지금까지 뭘하려 코드를 게시 할 수 (스파크 SQL과 같은 ML)를 어떤 점화 모듈을 사용할 수 있습니까? Spark SQL 및 스트리밍을 사용하여 가능합니다. – Shankar
문제의 게시 된 코드. –
'sqlContext'로 json 문자열을 읽으려고 할 때, 당신이 직면 한 문제는 무엇입니까? 작업 직렬화 할 수없는 문제입니까? – Shankar