의 구성원이 아닙니다. Spark-shell을 사용하고 있습니다. Spark-shell을 사용하여 정서 분석을 수행하기 위해 Kafka 주제의 트윗을 저장했습니다.값 꼬리가 (문자열, 문자열)
내가 추가 한 종속 관계 : org.apache.spark : 스파크 스트리밍 kafka_2.10 : 1.6.2 edu.stanford.nlp : 스탠포드 - corenlp : 3.5.1
이가 코드입니다 이는 내가하고 있어요 :이 라인을 실행하는 동안,
import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.kafka._
val conf = new SparkConf().setMaster("local[4]").setAppName("KafkaReceiver")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaStream = KafkaUtils.createStream(ssc, "sandbox.hortonworks.com:2181","test-consumer-group", Map("test12" -> 5))
val topCounts60 = kafkaStream.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map { case (topic, count) => (count, topic) }.transform(_.sortByKey(false))
topCounts60.foreachRDD(rdd => {
val topList = rdd.take(10)
println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
})
kafkaStream.count().map(cnt => "Received " + cnt + " kafka messages.").print()
val wordSentimentFilePath = "hdfs://sandbox.hortonworks.com:8020/TwitterData/AFINN.txt"
val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
val Array(word, happiness) = line.split("\t")
(word, happiness)
} cache()
val happiest60 = kafkaStream.map(hashTag => (hashTag.tail, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)). transform{topicCount => wordSentiments.join(topicCount)}
.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}.map{case (topic, happinessValue) => (happinessValue, topic)}.transform(_.sortByKey(false))
ssc.start()
ssc.stop()
을하지만
val happiest60 = kafkaStream.map(hashTag => (hashTag.tail,1)).reduceByKeyAndWindow(_ + _, Seconds(60)). transform{topicCount => wordSentiments.join(topicCount)}.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}.map{case (topic, happinessValue) => (happinessValue, topic)}.transform(_.sortByKey(false))
이 발생 오류 :
,error : value tail is not a member of (String, String)
모든 변수의 유형을 선언 해보십시오. 그것은 당신이 틀린 곳을 알아내는 데 도움이 될 수 있습니다. – Dima