2

나는 스파크 엔진에서 스파크 스트리밍을 배우기 시작했으며 데이터 분석과 스파크를 처음 접했다. 나는 미래의 데이터를 예측하기 위해 작은 IOT 애플리케이션을 만들고 싶다.자바 스파크 스트리밍 JSON 파싱

는 I 데이터가 배치되는 유닉스 타임 스탬프 t이 다음과 같이, 실시간 센서 JSON 데이터를 전송 TIVA 하드웨어

[{"t":1478091719000,"sensors":[{"s":"s1","d":"+253.437"},{"s":"s2","d":"+129.750"},{"s":"s3","d":"+45.500"},{"s":"s4","d":"+255.687"},{"s":"s5","d":"+290.062"},{"s":"s6","d":"+281.500"},{"s":"s7","d":"+308.250"},{"s":"s8","d":"+313.812"}]}] 

이다있다. 센서는 각 센서 ('s') 데이터가 'd'인 센서 배열입니다.

내가 원하는 것은이 데이터를 소비하고 spark-streaming하는 객체를 만든 다음 spark의 Mlib (기계 학습) 또는 동급 라이브러리를 통해 모든 데이터를 전달하여 향후 데이터를 예측합니다.

나는 이것이 내가 사용하기로 결정 모든 기술 선택

  1. 와 가능 여부를 일반적인 생각을 줄까?
  2. 중첩 된 JSON을 어떻게 소비 할 수 있습니까? 나는 SQLContext를 사용해 보았지만 성공하지 못했다.
  3. 여기에서 내가하려고하는 것을 달성하기위한 일반적인 지침.

다음은 KAFKA의 메시지를 사용하는 코드입니다.

SparkConf conf = new SparkConf().setAppName("DattusSpark").setMaster("local[2]"); 

    JavaSparkContext sc = new JavaSparkContext(conf); 

    JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000)); 

    // TODO: processing pipeline 
    Map<String, String> kafkaParams = new HashMap<String, String>(); 
    kafkaParams.put("metadata.broker.list", "kafkaserver_address:9092"); 
    Set<String> topics = Collections.singleton("RAH"); 


    JavaPairInputDStream<String, String> directKafkaStream = 
      KafkaUtils.createDirectStream(ssc, String.class, String.class, StringDecoder.class, 
        StringDecoder.class, kafkaParams, topics); 


    JavaDStream<String> json = directKafkaStream.map(new Function<Tuple2<String,String>, String>() { 
     public String call(Tuple2<String,String> message) throws Exception { 
      System.out.println(message._2()); 
      return message._2(); 
     }; 
    }); 


    System.out.println(" json is 0------ 0"+ json); 



    json.foreachRDD(rdd -> { 
     rdd.foreach(
       record -> System.out.println(record)); 
    }); 

    ssc.start(); 
    ssc.awaitTermination(); 

추신 : 나는 선형성과 성능을 유지하기 위해 Java에서 이것을하고 싶습니다. 질문에 대한

+0

당신은 지금까지 뭘하려 코드를 게시 할 수 (스파크 SQL과 같은 ML)를 어떤 점화 모듈을 사용할 수 있습니까? Spark SQL 및 스트리밍을 사용하여 가능합니다. – Shankar

+0

문제의 게시 된 코드. –

+0

'sqlContext'로 json 문자열을 읽으려고 할 때, 당신이 직면 한 문제는 무엇입니까? 작업 직렬화 할 수없는 문제입니까? – Shankar

답변

2

답변 :

1)이 내가 사용하기로 결정 모든 기술 선택 가능할지 여부?

`Ans: Yes it can be done and quiet a normal use-case for spark.` 

2) 중첩 된 JSON은 어떻게 사용합니까? 나는 SQLContext를 사용해 보았지만 성공하지 못했다.

`Ans: Nested JSON with SQLContext is little tricky. You may want to use Jackson or some other JSON library.` 

3) 여기서 수행하려고하는 것을 달성하기위한 일반 지침. 다른 기계 학습 알고리즘 또는 제 3 자의 라이브러리를 사용하려면

Ans: Consuming messages through kafka seems fine, but only a limited machine learning algorithms are supported through streaming.

은, 아마 당신은 마지막에 모델을 emmiting 일괄 작업으로 모델 생성을 고려해야합니다. 스트리밍 작업은 모델을로드하고 데이터 스트림을 가져와 예측 만해야합니다.

+0

이와 같은 사용 사례에 대한 적절한 문서로 안내 할 수 있습니까? 매우 도움이 될 것입니다 –

4

당신이 SparkSession에서, 스파크 2.0을 사용하고 있기 때문에, 당신은 JSON

json.foreachRDD(rdd -> { 

     DataFrame df= spark.read.json(rdd) 
     //process json with this DF. 
} 

를 읽을 수 있습니다 또는 당신은 당신이 createDataFrame 방법을 사용할 수, 행의 RDD에 RDD을 변환 할 수 있습니다.

json.foreachRDD(rdd -> { 

      DataFrame df= spark.createDataFrame(rdd); 
      //process json with this DF. 
    } 

DF에서 중첩 된 JSON 처리가 가능하므로 this 문서를 참조하십시오.당신이 DF로 JSON 변환하면

또한, 당신은

+0

제 경우 SQLContext 생성자에서 사용을 시도했으나 더 이상 사용되지 않았습니다. 그리고 'JavaSparkContext'를 사용하여 'sc'(SparkContext)를 얻는 방법을 얻지 못했습니다. –

+0

@RahulBorkar : SQLContext (javasparkContext) – Shankar

+0

에 JavaSparkContext를 전달할 수 있습니다. Spark 2.11에서는 더 이상 사용되지 않습니다. 또한, 귀하의 코드를 시도 할 때 "메소드 변환 (함수 , JavaRDD >) 유형이 모호합니다. JavaDStream " –