2016-08-05 3 views
0

비정형적인 문제가 있습니다. kafka에서받은 rdd 처리를 시도 할 때 sparkContext에 액세스하려고하면 예외 (java.lang.NullPointerException)가 발생합니다. 그러나스파크 스트리밍 손실 SparkContext

val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER) 
val sqlContext = new SQLContext(ssc.sparkContext) 
receiver.foreachRDD { rdd => 
    log.info("Received RDD attempt") 
    if (!rdd.isEmpty()) { 
     rdd.foreach(a => rddProcessor.processingRDD(a, sqlContext)) 
    } 

나는 단지 첫째 RDD을 처리 할 때 너무 이유, 문제가

val receiver = KafkaUtils.createStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_AND_DISK_SER) 
val sqlContext = new SQLContext(ssc.sparkContext) 
receiver.foreachRDD { rdd => 
    log.info("Received RDD attempt") 
    if (!rdd.isEmpty()) { 
    rddProcessor.processingRDD(rdd.first(), sqlContext) 
    } 

난 정말 모르겠어요 발생하지 : RDDProcessor이 문제는이 시작

def convertArrayToDataFrame(byteArray: Array[Byte], sqlContext: SQLContext) = { 
val stringFromByte = b2s(byteArray) 
val rdd = sqlContext.sparkContext.parallelize(stringFromByte.split("\n")) 
val rows = rdd.map(_.split(";")).map(attributes => Row.fromSeq(attributes.toSeq)) 
val dateframe = sqlContext.createDataFrame(rows,RDDComponents.schema) 
dateframe 
} 

직렬화 문제. 누군가가 조언을하면 내가 StreamingContext

val sparkConf = new SparkConf().setAppName("KafkaConsumer") 
val ssc = new StreamingContext(sparkConf, Milliseconds(batchDuration)) 
+0

ssc : SparkContext를 정의 할 때 코드를 제공 할 수 있습니까? – ponkin

+0

좋아요, 나는 이것을 – kasiula03

답변

0

음을 정의하는

@EDIT 감사합니다, SparkContext는 직렬화하지이며,이 @transient로 표시된 것 SparkSession 통해 SqlContext에서 사용할 수 있습니다. 그래서 foreach '이나 map'과 같이 직렬화해야하는 람다에서는 사용할 수 없습니다. (foreachRDD의 값은 아니지만 SparkContext을 사용하지 마십시오!) processingRDD을 쓸 수 없다면,).

+0

에 올렸습니다. 이해하지만, 왜 그것이 하나의 rdd에서 작동합니까? 객체가 직렬화되면 모든 함수가 직렬화됩니다. – kasiula03

+0

'foreachRDD'는 다른 노드에 인수를 보내지 않습니다. 드라이버 노드에서 완전히 실행되며 ('rdd.first()'는 다른 노드에서 드라이버 노드로 요소를 보냅니다). 'rdd.foreach'는 각 노드에 인수를 전송하여 거기에서 실행해야합니다. –

+0

어떻게이 문제를 건너 뛸 수 있습니까?이 rdd를 sparkContext로 처리해야합니까? rdd.colect()는 작동하지만 올바르지 않은 방법입니다. – kasiula03

관련 문제