2014-12-01 5 views
1

나는 단어 목록에 따라 Twitter 스트리밍 피드를 필터링하는 Spark 응용 프로그램을 작성하려고합니다. 내 목록에 약 8000 단어가 있습니다 (Twitter 필터링 API는 최대 400 단어까지만 지원합니다). 나는 들어오는 각 트윗을 단어로 토큰 화 한 다음이 단어가 내 목록에 있는지 아닌지 확인하려고합니다. 짹짹에서 단어 중 하나가 목록에 있다면, 나는 그 짹짹을 인쇄해야합니다, 그렇지 않으면 그것을 거부해야합니다.Twitter Spark Stream Filtering : 작업이 직렬화되지 않음 예외

나는이 (나는이 내 코드를 실행/테스트하는 올바른 방법 스파크 쉘에 한 번에 몇 줄 수있는 다음 코드를 붙여 복사?) 달성하기 위해 코드를 다음 작성했습니다 :

// excluding imports to keep in concise 
val consumerKey = "" // removed while posting on SOF 
val consumerSecret = "" // removed while posting on SOF 
val accessToken = "" // removed while posting on SOF 
val accessTokenSecret = "" // removed while posting on SOF 
val url = "https://stream.twitter.com/1.1/statuses/filter.json" 

val sparkConf = new SparkConf().setAppName("Twitter Sentiment Analysis") 
val sc = new SparkContext(sparkConf) 


val csvFilterWordsList = sc.textFile("<path to file>/uniq_list_8.0_sorted") 
var filterWordsList : Set[String] = Set() 
for(filterWords <- csvFilterWordsList.collect()) { 
    filterWordsList += filterWords.split(",")(0) 
} 



// Twitter Streaming 
val ssc = new JavaStreamingContext(sc,Seconds(2)) 

val conf = new ConfigurationBuilder() 
conf.setOAuthAccessToken(accessToken) 
conf.setOAuthAccessTokenSecret(accessTokenSecret) 
conf.setOAuthConsumerKey(consumerKey) 
conf.setOAuthConsumerSecret(consumerSecret) 
conf.setStreamBaseURL(url) 
conf.setSiteStreamBaseURL(url) 

val filter = Array("twitter") 

val auth = AuthorizationFactory.getInstance(conf.build()) 
val tweets : JavaReceiverInputDStream[twitter4j.Status] = TwitterUtils.createStream(ssc, auth, filter) 

object test extends Serializable { 
def similarity(tweet : twitter4j.Status, wordsList : Set[String]) : String = { 
    val tweetTokenized = tweet.getText.replaceAll("[^a-zA-Z0-9]", " ").split(" ") 
    var flag = false 
    for(word <- tweetTokenized) { 
     if(wordsList.contains(word)) { 
      flag = true 
     } 
    } 
    if(flag && tweet.getUser.getLang == "en") { 
    return (tweet.getText + "," + tweet.getUser.getLang) 
    } 
    else { 
    return "" 
    } 
} 
} 
val statuses = tweets.dstream.map(status => test.similarity(status,filterWordsList)) 

statuses.print() 
ssc.start() 

하지만 실행에 나는 다음과 같은 예외 점점 오전 :

org.apache.spark.SparkException: Task not serializable 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) 
    at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:436) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:59) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:64) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:66) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:68) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:70) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72) 
    at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:74) 
    at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:76) 
    at $iwC$$iwC$$iwC$$iwC.<init>(<console>:78) 
    at $iwC$$iwC$$iwC.<init>(<console>:80) 
    at $iwC$$iwC.<init>(<console>:82) 
    at $iwC.<init>(<console>:84) 
    at <init>(<console>:86) 
    at .<init>(<console>:90) 
    at .<clinit>(<console>) 
    at .<init>(<console>:7) 
    at .<clinit>(<console>) 
    at $print(<console>) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:789) 
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1062) 
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:615) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:646) 
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:610) 
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:814) 
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:859) 
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:771) 
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:616) 
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:624) 
    at org.apache.spark.repl.SparkILoop.loop(SparkILoop.scala:629) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:954) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:902) 
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:902) 
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:997) 
    at org.apache.spark.repl.Main$.main(Main.scala:31) 
    at org.apache.spark.repl.Main.main(Main.scala) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:606) 
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: twitter4j.conf.ConfigurationBuilder 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547) 
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508) 
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431) 
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177) 
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347) 
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 

을하지만 난 그냥
val statuses = tweets.dstream.map(status => status.getText)
같은 간단한 매핑을 수행 할 때잘 작동합니다.

누군가 내가 잘못하고있는 부분을 도와 줄 수 있습니까?

+0

Google 검색이 실제로 특정 검색에서 도움이되는 결과를 산출 한 것에 놀랐습니다. : D –

답변

4

Spark-shell은 익명 클래스의 코드를 캡슐화하여 코드를 직렬화하고 작업자에게 제공합니다. 때로는 무엇이 캡처되고 어떤 범위에 있는지를 아는 것은 까다로운 일입니다. 스파크 셸에서 코드를 복사/붙여 넣는 경우, 붙여 넣기하는 행의 순서와 양 (예 : :paste)은 다른 클래스 구조가됩니다.

직렬화 문제를 피하는 규칙은 @transient을 dstream 작업에 필요하지 않은 것으로 표시하는 것입니다. 이 특별한 경우에는 conf, authtweets에 일시적인 특수 효과를 추가합니다.

+0

'conf','auth'와'tweets'와 함께,이 작업을하기 위해'sparkConf','sc','ssc'와'statuses'를 임시 변수로 만들어야했습니다. 하지만, 여기서 무슨 일이 일어나고 있는지 완전히 이해하지 못합니다. 지금 세부 사항을 조사 할 것입니다. 감사 !! –

+0

메소드 대신에 함수 val을 테스트 해보십시오 :'val similarity : twitter4j.Status => Set [String]) => String = ??? 이것은 클로저 직렬화에 당신의 맥락을 너무 많이 끌어들이는 것을 피해야한다. – maasg

관련 문제