6
다음은 간단한 코드로 30 초의 창 크기와 10 초의 슬라이드 크기에 대해 단어 수를 계산합니다.스파크 스트리밍 창 작업
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.api.java.function._
import org.apache.spark.streaming.api._
import org.apache.spark.storage.StorageLevel
val ssc = new StreamingContext(sc, Seconds(5))
// read from text file
val lines0 = ssc.textFileStream("test")
val words0 = lines0.flatMap(_.split(" "))
// read from socket
val lines1 = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK_SER)
val words1 = lines1.flatMap(_.split(" "))
val words = words0.union(words1)
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
wordCounts.print()
ssc.checkpoint(".")
ssc.start()
ssc.awaitTermination()
그러나, 나는이 라인에서 오류가 점점 오전 :
val wordCounts = words.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(30), Seconds(10))
합니다. 특히 _ + _
에서. 오류는
51: error: missing parameter type for expanded function ((x$2, x$3) => x$2.$plus(x$3))
누구에게 어떤 문제가 있는지 말해 줄 수 있습니까? 감사!
이 감사에 설명되어 있습니다! 변경 후 프로그램에서 예상 한 결과를 제공하지만 그 동안에는 다른 오류가 발생했습니다. java.util.NoSuchElementException : key not found : 1406051860000 ms \t at scala.collection.MapLike $ class.default (MapLike.scala 228) scala.collection.AbstractMap.default에서 \t (Map.scala 58) scala.collection.mutable.HashMap.apply (HashMap.scala에서 \t : org.apache.spark.streaming에서 64) \t. dstream.ReceiverInputDStream.getReceivedBlockInfo (ReceiverInputDStream.scala : 77) 나는 어떻게 된 것일까? – user2895478
@ user2895478 나는 이것이 [Jira 티켓] (https://issues.apache.org/jira/browse/SPARK-2009)에서 온 것으로 믿는다. 문제는 1.0.1과 1.1.0에서 해결되었다. – aaronman