2014-07-22 5 views
6

다음은 간단한 코드로 30 초의 창 크기와 10 초의 슬라이드 크기에 대해 단어 수를 계산합니다.스파크 스트리밍 창 작업

import org.apache.spark.SparkConf 
import org.apache.spark.streaming._ 
import org.apache.spark.streaming.StreamingContext._ 
import org.apache.spark.api.java.function._ 
import org.apache.spark.streaming.api._ 
import org.apache.spark.storage.StorageLevel 

val ssc = new StreamingContext(sc, Seconds(5)) 

// read from text file 
val lines0 = ssc.textFileStream("test") 
val words0 = lines0.flatMap(_.split(" ")) 

// read from socket 
val lines1 = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER) 
val words1 = lines1.flatMap(_.split(" ")) 

val words = words0.union(words1) 
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

wordCounts.print() 
ssc.checkpoint(".") 
ssc.start() 
ssc.awaitTermination() 

그러나, 나는이 라인에서 오류가 점점 오전 :

val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10)) 

합니다. 특히 _ + _에서. 오류는

51: error: missing parameter type for expanded function ((x$2, x$3) => x$2.$plus(x$3)) 

누구에게 어떤 문제가 있는지 말해 줄 수 있습니까? 감사!

답변

10

이것은 매우 쉽게 해결할 수 있으며 유형을 명시해야합니다. 이 경우 유형을 추론 할 수
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow((a:Int,b:Int)=>a+b, Seconds(30), Seconds(10))

이유 스칼라는 this answer

+0

이 감사에 설명되어 있습니다! 변경 후 프로그램에서 예상 한 결과를 제공하지만 그 동안에는 다른 오류가 발생했습니다. java.util.NoSuchElementException : key not found : 1406051860000 ms \t at scala.collection.MapLike $ class.default (MapLike.scala 228) scala.collection.AbstractMap.default에서 \t (Map.scala 58) scala.collection.mutable.HashMap.apply (HashMap.scala에서 \t : org.apache.spark.streaming에서 64) \t. dstream.ReceiverInputDStream.getReceivedBlockInfo (ReceiverInputDStream.scala : 77) 나는 어떻게 된 것일까? – user2895478

+0

@ user2895478 나는 이것이 [Jira 티켓] (https://issues.apache.org/jira/browse/SPARK-2009)에서 온 것으로 믿는다. 문제는 1.0.1과 1.1.0에서 해결되었다. – aaronman

관련 문제