나는 단어 목록에 따라 Twitter 스트리밍 피드를 필터링하는 Spark 응용 프로그램을 작성하려고합니다. 내 목록에 약 8000 단어가 있습니다 (Twitter 필터링 API는 최대 400 단어까지만 지원합니다). 나는 들어오는 각 트윗을 단어로 토큰 화 한 다음이 단어가 내 목록에 있는지 아닌지 확인하려고합니다. 짹짹에서 단어 중 하나가 목록에 있다면, 나는 그 짹짹을 인쇄해야합니다, 그렇지 않으면 그것을 거부해야합니다.Twitter Spark Stream Filtering : 작업이 직렬화되지 않음 예외
나는이 (나는이 내 코드를 실행/테스트하는 올바른 방법 스파크 쉘에 한 번에 몇 줄 수있는 다음 코드를 붙여 복사?) 달성하기 위해 코드를 다음 작성했습니다 :
// excluding imports to keep in concise
val consumerKey = "" // removed while posting on SOF
val consumerSecret = "" // removed while posting on SOF
val accessToken = "" // removed while posting on SOF
val accessTokenSecret = "" // removed while posting on SOF
val url = "https://stream.twitter.com/1.1/statuses/filter.json"
val sparkConf = new SparkConf().setAppName("Twitter Sentiment Analysis")
val sc = new SparkContext(sparkConf)
val csvFilterWordsList = sc.textFile("<path to file>/uniq_list_8.0_sorted")
var filterWordsList : Set[String] = Set()
for(filterWords <- csvFilterWordsList.collect()) {
filterWordsList += filterWords.split(",")(0)
}
// Twitter Streaming
val ssc = new JavaStreamingContext(sc,Seconds(2))
val conf = new ConfigurationBuilder()
conf.setOAuthAccessToken(accessToken)
conf.setOAuthAccessTokenSecret(accessTokenSecret)
conf.setOAuthConsumerKey(consumerKey)
conf.setOAuthConsumerSecret(consumerSecret)
conf.setStreamBaseURL(url)
conf.setSiteStreamBaseURL(url)
val filter = Array("twitter")
val auth = AuthorizationFactory.getInstance(conf.build())
val tweets : JavaReceiverInputDStream[twitter4j.Status] = TwitterUtils.createStream(ssc, auth, filter)
object test extends Serializable {
def similarity(tweet : twitter4j.Status, wordsList : Set[String]) : String = {
val tweetTokenized = tweet.getText.replaceAll("[^a-zA-Z0-9]", " ").split(" ")
var flag = false
for(word <- tweetTokenized) {
if(wordsList.contains(word)) {
flag = true
}
}
if(flag && tweet.getUser.getLang == "en") {
return (tweet.getText + "," + tweet.getUser.getLang)
}
else {
return ""
}
}
}
val statuses = tweets.dstream.map(status => test.similarity(status,filterWordsList))
statuses.print()
ssc.start()
을
하지만 실행에 나는 다음과 같은 예외 점점 오전 :
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242)
at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:436)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74)
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76)
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:78)
at $iwC$$iwC$$iwC.<init>(<console>:80)
at $iwC$$iwC.<init>(<console>:82)
at $iwC.<init>(<console>:84)
at <init>(<console>:86)
at .<init>(<console>:90)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771)
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616)
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624)
at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997)
at org.apache.spark.repl.Main$.main(Main.scala:31)
at org.apache.spark.repl.Main.main(Main.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: twitter4j.conf.ConfigurationBuilder
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
을하지만 난 그냥
val statuses = tweets.dstream.map(status => status.getText)
같은 간단한 매핑을 수행 할 때잘 작동합니다.
누군가 내가 잘못하고있는 부분을 도와 줄 수 있습니까?
Google 검색이 실제로 특정 검색에서 도움이되는 결과를 산출 한 것에 놀랐습니다. : D –