Kafka 및 Spark Streaming을 사용하여 알고리즘을 개발 중입니다. 이것은 내 수신기의 일부입니다Spark Streaming with Kafka : 빈 모음 예외
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("Traccia2014")
val ssc = new StreamingContext(sparkConf, Seconds(10))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
val slice=30
val lines = messages.map(_._2)
val dStreamDst=lines.transform(rdd => {
val y= rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b)
rdd.map(x => (((x.split(",")(0).toInt - y.toInt).toLong/slice).round*slice+" "+(x.split(",")(2)),1)).reduceByKey(_ + _)
})
dStreamDst.print()
하는 나는 다음과 같은 오류 얻을에 :
가ERROR JobScheduler: Error generating jobs for time 1484927230000 ms
java.lang.UnsupportedOperationException: empty collection
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034)
at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034)
무엇을 뜻 하는가? 어떻게 해결할 수 있을까요? 도움의 모든 종류의 진정 사전에 appreciated..thanks되어
업데이트 : 를 해결. transform 또는 print() 메서드를 사용하지 마십시오. foreachRDD를 사용하는 것이 가장 좋은 해결책입니다.
: rdd.map (X => (((x.split ("," reduceByKey (_ + _) –
충분히 정교합니다. "(0) .toInt - y.toInt) .toLong/slice). around * slice +" "+ (x.split (", ") (2) , 내 대답을 업데이 트했습니다. – jeff