2017-10-03 2 views
-2

의 구성원이 아닙니다. Spark-shell을 사용하고 있습니다. Spark-shell을 사용하여 정서 분석을 수행하기 위해 Kafka 주제의 트윗을 저장했습니다.값 꼬리가 (문자열, 문자열)

내가 추가 한 종속 관계 : org.apache.spark : 스파크 스트리밍 kafka_2.10 : 1.6.2 edu.stanford.nlp : 스탠포드 - corenlp : 3.5.1

이가 코드입니다 이는 내가하고 있어요 :이 라인을 실행하는 동안,

import org.apache.spark._ 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.kafka._ 
val conf = new SparkConf().setMaster("local[4]").setAppName("KafkaReceiver") 
val ssc = new StreamingContext(conf, Seconds(5)) 
val kafkaStream = KafkaUtils.createStream(ssc, "sandbox.hortonworks.com:2181","test-consumer-group", Map("test12" -> 5)) 
val topCounts60 = kafkaStream.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map { case (topic, count) => (count, topic) }.transform(_.sortByKey(false)) 
    topCounts60.foreachRDD(rdd => { 
     val topList = rdd.take(10) 
     println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count())) 
     topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) } 
    }) 
kafkaStream.count().map(cnt => "Received " + cnt + " kafka messages.").print() 
val wordSentimentFilePath = "hdfs://sandbox.hortonworks.com:8020/TwitterData/AFINN.txt" 
    val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line => 
    val Array(word, happiness) = line.split("\t") 
    (word, happiness) 
    } cache() 
val happiest60 = kafkaStream.map(hashTag => (hashTag.tail, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)). transform{topicCount => wordSentiments.join(topicCount)} 
       .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}.map{case (topic, happinessValue) => (happinessValue, topic)}.transform(_.sortByKey(false)) 
ssc.start() 
ssc.stop() 

을하지만

val happiest60 = kafkaStream.map(hashTag => (hashTag.tail,1)).reduceByKeyAndWindow(_ + _, Seconds(60)). transform{topicCount => wordSentiments.join(topicCount)}.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}.map{case (topic, happinessValue) => (happinessValue, topic)}.transform(_.sortByKey(false)) 

이 발생 오류 :

,

error : value tail is not a member of (String, String)

+0

모든 변수의 유형을 선언 해보십시오. 그것은 당신이 틀린 곳을 알아내는 데 도움이 될 수 있습니다. – Dima

답변

0

아마도 hashTag 유형이 (String, String)이므로 꼬리 연산이 정의되어 있지 않습니다. tail은 튜플이 아닌 콜렉션에 정의 된 함수입니다.

map 작업은 스트림에서받은 단일 항목에 대해 작동합니다. 카프카 스트림에 (String, String) 유형의 항목이 포함되어 있으면 정상입니다.

+0

답변 해 주셔서 감사합니다 .... – Priyal