비정형적인 문제가 있습니다. kafka에서받은 rdd 처리를 시도 할 때 sparkContext에 액세스하려고하면 예외 (java.lang.NullPointerException)가 발생합니다. 그러나스파크 스트리밍 손실 SparkContext
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rdd.foreach(a => rddProcessor.processingRDD(a, sqlContext))
}
나는 단지 첫째 RDD을 처리 할 때 너무 이유, 문제가
val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER)
val sqlContext = new SQLContext(ssc.sparkContext)
receiver.foreachRDD { rdd =>
log.info("Received RDD attempt")
if (!rdd.isEmpty()) {
rddProcessor.processingRDD(rdd.first(), sqlContext)
}
난 정말 모르겠어요 발생하지 : RDDProcessor이 문제는이 시작
def convertArrayToDataFrame(byteArray: Array[Byte], sqlContext: SQLContext) = {
val stringFromByte = b2s(byteArray)
val rdd = sqlContext.sparkContext.parallelize(stringFromByte.split("\n"))
val rows = rdd.map(_.split(";")).map(attributes => Row.fromSeq(attributes.toSeq))
val dateframe = sqlContext.createDataFrame(rows,RDDComponents.schema)
dateframe
}
직렬화 문제. 누군가가 조언을하면 내가 StreamingContext
val sparkConf = new SparkConf().setAppName("KafkaConsumer")
val ssc = new StreamingContext(sparkConf, Milliseconds(batchDuration))
ssc : SparkContext를 정의 할 때 코드를 제공 할 수 있습니까? – ponkin
좋아요, 나는 이것을 – kasiula03