나는 spark 스트리밍 응용 프로그램에서 json 속성의 최소값, 최대 값을 인쇄해야하는데 그 속성은 2 초짜리 슬라이딩 윈도우로 매 20 초마다 max를 인쇄해야합니다 . 기본적으로 (POC 용) 작업 그룹 sparkContext의 Spark UI에서 min, max를 인쇄하려고합니다.Spark Streaming Windowing 출력
SetJobGroup ("count-min-max", "count-min-max value of quality attribute").
이것은 20 초마다 스파크 UI 디스플레이에 표시되어야합니다.
아래 코드는 min, max, count를 얻을 수있는 코드이지만 인쇄는 매 2 초마다 실행되며, 배치 간격은 20 초가 아닙니다.
val ssc = new StreamingContext(sparkContext, Seconds(2))
val record = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicMap, StorageLevel.MEMORY_ONLY_SER_2)
val lines=record.map(_._2)
//val jsonCounts=lines.map { jsonRecord => parseJson(jsonRecord) }.map { x => x.mkString("\n") }.print
val valueDtsream:DStream[Array[Double]]=lines.map { jsonRecord => parseJson(jsonRecord) }
.window(Seconds(20),Seconds(2))
valueDtsream.foreachRDD
{
rdd =>
if (!rdd.partitions.isEmpty)
{
val stats = rdd.flatMap(x => x)
println(stats.count().toString()+"-"+stats.min().toString()+"-"+stats.max().toString)
}
}
ssc.start()
ssc.awaitTermination()
무엇이 문제입니까? –
스트리밍 배치 시간이 2 초인 반면 min, max, 매 20 초마다 계산하고 싶습니다. – nilesh1212