2016-11-21 2 views
0

나는 spark 스트리밍 응용 프로그램에서 json 속성의 최소값, 최대 값을 인쇄해야하는데 그 속성은 2 초짜리 슬라이딩 윈도우로 매 20 초마다 max를 인쇄해야합니다 . 기본적으로 (POC 용) 작업 그룹 sparkContext의 Spark UI에서 min, max를 인쇄하려고합니다.Spark Streaming Windowing 출력

SetJobGroup ("count-min-max", "count-min-max value of quality attribute"). 

이것은 20 초마다 스파크 UI 디스플레이에 표시되어야합니다.

아래 코드는 min, max, count를 얻을 수있는 코드이지만 인쇄는 매 2 초마다 실행되며, 배치 간격은 20 초가 아닙니다.

val ssc = new StreamingContext(sparkContext, Seconds(2)) 

val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2) 
val lines=record.map(_._2) 

     //val jsonCounts=lines.map { jsonRecord => parseJson(jsonRecord) }.map { x => x.mkString("\n") }.print 

     val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) } 
          .window(Seconds(20),Seconds(2)) 

     valueDtsream.foreachRDD 
     { 
     rdd => 
      if (!rdd.partitions.isEmpty) 
      { 
      val stats = rdd.flatMap(x => x) 
      println(stats.count().toString()+"-"+stats.min().toString()+"-"+stats.max().toString) 
      } 
     } 

     ssc.start() 
     ssc.awaitTermination() 
+0

무엇이 문제입니까? –

+0

스트리밍 배치 시간이 2 초인 반면 min, max, 매 20 초마다 계산하고 싶습니다. – nilesh1212

답변

0

난 당신이 slideIntervalwindowLength 사이의 혼란 생각합니다. window(windowLength, slideInterval)에서 :

  1. windowLength 데이터의 많은 간격이 창은 계산을 위해 고려하는 방법을 의미하는 윈도우의 길이이다.
  2. slideInterval은 창 계산이 완료된 후 창 이동 간격을 나타냅니다.

질문을 올바르게 이해하면 .window(Seconds(x),Seconds(20))으로 편집해야합니다.

+0

나는 이것에 대해 새 창 (windowLength, slideInterval) 함수에 대한 설명을 도와 주시겠습니까? 작동 원리 – nilesh1212

+0

질문에 "20 초 간격으로 카운트", "스트리밍 배치 시간은 2 초"로 지정했습니다. 그러나 당신의 창 길이는 무엇입니까? – vdep

+0

님이 대답을 편집했습니다 – vdep

관련 문제