path
을 Spark Streaming에서 실행되는 saveAsTextFile
함수에 전달하고 싶습니다. 그러나 나는 java.io.NotSerializableException
을 얻습니다. 보통 비슷한 경우에는 skeleton 객체를 사용하지만이 경우 문제를 해결하는 방법을 알지 못합니다. 아무도 나를 도와 줄 수 있니?Spark : java.io.NotSerializableException
import java.util
import java.util.Properties
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import com.lambdaworks.jacks.JacksMapper
import org.sedis._
import redis.clients.jedis._
import com.typesafe.config.ConfigFactory
import kafka.consumer.{Consumer, ConsumerConfig}
import kafka.utils.Logging
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
class KafkaTestConsumer(val zkQuorum: String,
val group: String,
val topicMessages: String,
val path: String) extends Logging
{
// ...
// DStream[String]
dstream.foreachRDD { rdd =>
// rdd -> RDD[String], each String is a JSON
// Parsing each JSON
// splitted -> RDD[Map[String,Any]]
val splitted = rdd.map(line => Utils.parseJSON(line))
// ...
splitted.saveAsTextFile(path)
}
}
object Utils {
def parseJSON[T](json: String): Map[String,Any] = {
val mapper = new ObjectMapper() with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
mapper.readValue[Map[String,Any]](json)
}
}
전체 스택 트레이스 :
01,235,16/09/22 17시 3분 28초 오류의 Utils : 예외가 발생 java.io.NotSerializableException : org.consumer.kafka.KafkaTestConsumer 자바에서 java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509)에서 java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548) 에서 .io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1184) 0,123,516 java.io.ObjectOutputStream.defaultWriteFields에서 java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178) 에서 java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) 에서(ObjectOutputStream.java:1548) java.io.ObjectOutputStream.writeObject0에서 java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432에서 java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) ) (ObjectOutputStream.java에서 : 1178) at java.io.ObjectOutputStream.defaultWriteFields (ObjectOutputStream.java:1548) java.io.ObjectOutputStream.writeObject0에서 java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) 에서 java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) 에서 7백45경1천5백15조5백36억9천1백36만3천2백10 (ObjectOutputStream.java:1178) java.io.ObjectOutputStream.defaultWriteFields에서 java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1174에서 java.io.ObjectOutputStream.writeArray (ObjectOutputStream.java:1378) ) (ObjectOutputStream.java에서 : 1548) at java.io.ObjectOutputStream.writeSerialData (ObjectOutputStream.java:1509) java.io.ObjectOutputStream.defaultWriteFields에서 java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178) 에서 java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432) (ObjectOutputStream.java:1548 10 org.apache.spark.streaming.DStreamGraph $$ anonfun $의 writeObject $ 1.apply $ MCV $ SP (DStreamGraph.scala에서 java.io.ObjectOutputStream.defaultWriteObject (ObjectOutputStream.java:441에서) ) : 180) at org.apache.spark.streaming.DStreamGraph $$ anonfun $ writeObject $ 1.apply (DStreamGraph.scala : 175) at org.apache.spark.streaming.DStreamGraph $$ anonfun $ writeObject $ 1 .apply (DStreamGraph.scala : 175) at org.apache.spark.util.Utils $ .tryOrIOException (Utils.scala : 1205) at org.apache.spark.streaming.DStreamGraph.writeObject (DStreamGraph.scala : 175) at sun.reflect.NativeMethodAccessorImpl.invoke0 (네이티브 메소드) sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) at sun. 자바에서 java.io.ObjectStreamClass.invokeWriteObject (ObjectStreamClass.java:1028)에서 java.lang.reflect.Method.invoke (Method.java:498) 에서 reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43) 에서 .io.ObjectOutputStream.writeSerialData java.io.ObjectOutputStream.writeObject0 (ObjectOutputStream.java:1178)에서 java.io.ObjectOutputStream.writeOrdinaryObject (ObjectOutputStream.java:1432)에서 (ObjectOutputStream.java:1496) org.apache.spark.serializer.SerializationDebugger $ SerializationDebugger.visitSerializableWithWri에서 java.io.ObjectOutputStream.writeObject (ObjectOutputStream.java:348) teObjectMethod (SerializationDebugger.scala : 230) : org.apache.spark.serializer.SerializationDebugger $ SerializationDebugger.visit에서 (SerializationDebugger org.apache.spark.serializer.SerializationDebugger $ SerializationDebugger.visitSerializable (189 SerializationDebugger.scala)에서 . 스칼라 108) org.apache.spark.serializer.SerializationDebugger $ SerializationDebugger.visitSerializable (SerializationDebugger.scala에서 : 108) : 206) org.apache.spark.serializer.SerializationDebugger $ SerializationDebugger.visit (SerializationDebugger.scala에서 을에서 : org.apache.spark.serializer.SerializationDebugger $의 .find (67 SerializationDebugger.scala)에서 53,691,363,210 org.apache.spark.serializer.SerializationDebugger $ .improveException (SerializationDebugger.scala : 41) : org.apache.spark에서 org.apache.spark.streaming.StreamingContext.validate (560 StreamingContext.scala)에서 . streaming.StreamingContext.liftedTree1 $ 1 (StreamingContext.scala : 601) org.apache.spark.streaming.StreamingContext.start (StreamingContext.scala 600)에서 org.consumer.kafka.KafkaDecisionsConsumer.run에서 (KafkaTestConsumer.scala : $ org.consumer.ServiceRunner .main (QueuingServiceRunner.scala 136) 20) org.consumer.ServiceRunner.main에서 (QueuingServiceRunner.scala)
전체 스택 추적을 게시 할 수 있습니까? – bear911
@ bear911 : 완료. – Lobsterrrr
이 DStream의 type 매개 변수는 무엇입니까? 더 많은 코드를 게시 할 수 있습니까? – bear911