2016-09-22 4 views
1

path을 Spark Streaming에서 실행되는 saveAsTextFile 함수에 전달하고 싶습니다. 그러나 나는 java.io.NotSerializableException을 얻습니다. 보통 비슷한 경우에는 skeleton 객체를 사용하지만이 경우 문제를 해결하는 방법을 알지 못합니다. 아무도 나를 도와 줄 수 있니?Spark : java.io.NotSerializableException

import java.util 
import java.util.Properties 
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} 
import com.fasterxml.jackson.module.scala.DefaultScalaModule 
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper 
import com.lambdaworks.jacks.JacksMapper 
import org.sedis._ 
import redis.clients.jedis._ 
import com.typesafe.config.ConfigFactory 
import kafka.consumer.{Consumer, ConsumerConfig} 
import kafka.utils.Logging 
import org.apache.log4j.{Level, Logger} 
import org.apache.spark.SparkConf 
import org.apache.spark.streaming.kafka.KafkaUtils 
import org.apache.spark.streaming.{Seconds, StreamingContext} 

class KafkaTestConsumer(val zkQuorum: String, 
         val group: String, 
         val topicMessages: String, 
         val path: String) extends Logging 
{ 

// ... 
// DStream[String] 
dstream.foreachRDD { rdd => 
    // rdd -> RDD[String], each String is a JSON 
    // Parsing each JSON 
    // splitted -> RDD[Map[String,Any]] 
    val splitted = rdd.map(line => Utils.parseJSON(line)) 
    // ... 
    splitted.saveAsTextFile(path) 
} 

} 

object Utils { 

    def parseJSON[T](json: String): Map[String,Any] = { 
    val mapper = new ObjectMapper() with ScalaObjectMapper 
    mapper.registerModule(DefaultScalaModule) 
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) 
    mapper.readValue[Map[String,Any]](json) 
    } 
} 

전체 스택 트레이스 :

16/09/22 17시 3분 28초 오류의 Utils : 예외가 발생 java.io.NotSerializableException : org.consumer.kafka.KafkaTestConsumer 자바에서 java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509)에서 java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548) 에서 .io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1184) 0,123,516 java.io.ObjectOutputStream.defaultWriteFields에서 java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178) 에서 java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) 에서(ObjectOutputStream.java:1548) java.io.ObjectOutputStream.writeObject0에서 java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432에서 java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) ) (ObjectOutputStream.java에서 : 1178) at java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548) java.io.ObjectOutputStream.writeObject0에서 java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) 에서 java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) 에서 7백45경1천5백15조5백36억9천1백36만3천2백10 (ObjectOutputStream.java:1178) java.io.ObjectOutputStream.defaultWriteFields에서 java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1174에서 java.io.ObjectOutputStream.writeArray (ObjectOutputStream.java:1378) ) (ObjectOutputStream.java에서 : 1548) at java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) java.io.ObjectOutputStream.defaultWriteFields에서 java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178) 에서 java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) (ObjectOutputStream.java:1548 10 org.apache.spark.streaming.DStreamGraph $$ anonfun $의 writeObject $ 1.apply $ MCV $ SP (DStreamGraph.scala에서 java.io.ObjectOutputStream.defaultWriteObject (ObjectOutputStream.java:441에서) ) : 180) at org.apache.spark.streaming.DStreamGraph $$ anonfun $ writeObject $ 1.apply (DStreamGraph.scala : 175) at org.apache.spark.streaming.DStreamGraph $$ anonfun $ writeObject $ 1 .apply (DStreamGraph.scala : 175) at org.apache.spark.util.Utils $ .tryOrIOException (Utils.scala : 1205) at org.apache.spark.streaming.DStreamGraph.writeObject (DStreamGraph.scala : 175) at sun.reflect.NativeMethodAccessorImpl.invoke0 (네이티브 메소드) sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun. 자바에서 java.io.ObjectStreamClass.invokeWriteObject (ObjectStreamClass.java:1028)에서 java.lang.reflect.Method.invoke (Method.java:498) 에서 reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) 에서 .io.ObjectOutputStream.writeSerialData java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178)에서 java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432)에서 (ObjectOutputStream.java:1496) org.apache.spark.serializer.SerializationDebugger $ SerializationDebugger.visitSerializableWithWri에서 java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348) teObjectMethod (SerializationDebugger.scala : 230) : org.apache.spark.serializer.SerializationDebugger $ SerializationDebugger.visit에서 (SerializationDebugger org.apache.spark.serializer.SerializationDebugger $ SerializationDebugger.visitSerializable (189 SerializationDebugger.scala)에서 . 스칼라 108) org.apache.spark.serializer.SerializationDebugger $ SerializationDebugger.visitSerializable (SerializationDebugger.scala에서 : 108) : 206) org.apache.spark.serializer.SerializationDebugger $ SerializationDebugger.visit (SerializationDebugger.scala에서 을에서 : org.apache.spark.serializer.SerializationDebugger $의 .find (67 SerializationDebugger.scala)에서 53,691,363,210 org.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala : 41) : org.apache.spark에서 org.apache.spark.streaming.StreamingContext.validate (560 StreamingContext.scala)에서 . streaming.StreamingContext.liftedTree1 $ 1 (StreamingContext.scala : 601) org.apache.spark.streaming.StreamingContext.start (StreamingContext.scala 600)에서 org.consumer.kafka.KafkaDecisionsConsumer.run에서 (KafkaTestConsumer.scala : $ org.consumer.ServiceRunner .main (QueuingServiceRunner.scala 136) 20) org.consumer.ServiceRunner.main에서 (QueuingServiceRunner.scala)

01,235,
+0

전체 스택 추적을 게시 할 수 있습니까? – bear911

+0

@ bear911 : 완료. – Lobsterrrr

+0

이 DStream의 type 매개 변수는 무엇입니까? 더 많은 코드를 게시 할 수 있습니까? – bear911

답변

0

문제는 rdd 작업을 사용하고 있습니다. 작업자가 실행중인 forEach에 대해 실행중인 saveEtext forEach입니다. 위의 코드를 실행하면 작업자가 직렬화 가능한 오류 예제를 실행하고 있습니다. splitted.saveAsTextFile (경로) 그것은 당신이 그것을 좋아할 수 있도록 직렬화 오류를주는 rdd 동작입니다.

dstream.foreachRDD { rdd => 
    // rdd -> RDD[String], each String is a JSON 
    // Parsing each JSON 
    // splitted -> RDD[Map[String,Any]] 
    val splitted = rdd.map(line => Utils.parseJSON(line)) 
    // ... 
}.saveAsTextFile(path) 
+0

그것은 기호 saveAsTextFile'를 확인할 수 없습니다 '라고. – Lobsterrrr

+0

가 여기에 나는 그것이 내 경우 비슷한 할 가능성이 있음을 발견 : 나는'ssc.checkpoint을 삭제하면 http://blog.madhukaraphatak.com/handling-empty-rdd-in-spark-streaming/ – Lobsterrrr

+0

마지막을, 내 코드가 작동 ("검문소")'.왜 직렬화에 영향을 미치는지 알 수 없습니다. 이상적으로 나는이 코드 줄을 삭제하고 싶지 않습니다. – Lobsterrrr

관련 문제