2017-01-29 3 views
1

Kafka 및 Spark Streaming을 사용하여 알고리즘을 개발 중입니다. 이것은 내 수신기의 일부입니다Spark Streaming with Kafka : 빈 모음 예외

val Array(brokers, topics) = args 
val sparkConf = new SparkConf().setAppName("Traccia2014") 
val ssc = new StreamingContext(sparkConf, Seconds(10)) 

// Create direct kafka stream with brokers and topics 
val topicsSet = topics.split(",").toSet 
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet) 
val slice=30 
val lines = messages.map(_._2) 
val dStreamDst=lines.transform(rdd => { 
    val y= rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b) 
    rdd.map(x => (((x.split(",")(0).toInt - y.toInt).toLong/slice).round*slice+" "+(x.split(",")(2)),1)).reduceByKey(_ + _) 
}) 
dStreamDst.print() 

하는 나는 다음과 같은 오류 얻을에 :

ERROR JobScheduler: Error generating jobs for time 1484927230000 ms 
java.lang.UnsupportedOperationException: empty collection 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034) 
    at org.apache.spark.rdd.RDD$$anonfun$reduce$1$$anonfun$apply$42.apply(RDD.scala:1034) 

무엇을 뜻 하는가? 어떻게 해결할 수 있을까요? 도움의 모든 종류의 진정 사전에 appreciated..thanks되어

업데이트 : 를 해결. transform 또는 print() 메서드를 사용하지 마십시오. foreachRDD를 사용하는 것이 가장 좋은 해결책입니다.

답변

1

transform() API를 사용하여 DStream과 상호 작용하는이 b/c가 발생했습니다. 이 방법을 사용할 때, 시간의 데이터 스냅 샷을 나타내는 RDD가 주어지며, 10 초 동안의 경우가 있습니다. 특정 시간대에 데이터가없고 RDD가 비어있어 reduce()를 호출 할 때 "빈 콜렉션"오류가 발생하므로 코드가 실패합니다.

조작을 호출하기 전에 RDD가 비어 있지 않은지 확인하려면 rdd.isEmpty()을 사용하십시오. 나는 그것을 사용하지 않을 경우, 나는이 작업을 수행 할 수 아니라서) (변환 사용하고

lines.transform(rdd => { 
    if (rdd.isEmpty) 
    rdd 
    else { 
    // rest of transformation 
    } 
}) 
+0

: rdd.map (X => (((x.split ("," reduceByKey (_ + _) –

+0

충분히 정교합니다. "(0) .toInt - y.toInt) .toLong/slice). around * slice +" "+ (x.split (", ") (2) , 내 대답을 업데이 트했습니다. – jeff

관련 문제