0
iam이 위의 코드를 실행하는 동안 위의 오류가 발생했습니다. 직렬화 가능한 문제가 있음을 발견했지만 정확히 추적 할 수 없습니다. 하나는 내가 여기서 무엇을 할 수 있는지 설명한다. 덕분에 사전에.스레드 "main"의 예외 org.apache.spark.SparkException : 작업이 직렬화되지 않음 "
enter code here
def checkforType(json:String):String={
val parsedjson = parse(json)
val res=(parsedjson \\ "Head" \\ "Type").extract[String]
(res)
}
val dstream = KafkaUtils.createStream(ssc, zkQuorum, group, Map("topic" -> 1)).map(_._2)
val pType = dstream.map(checkforType)
pType.map(rdd => {
val pkt= rdd.toString()
if(pkt.equals("P300")) {
val t300=dstream.map(par300)
t300.print()
}else if(pkt.equals("P30")) {
val t30=dstream.map(par30)
t30.print()
}else if(pkt.equals("P6")) {
val t6=dstream.map(par6)
t6.print()
}
})