2016-09-09 1 views
2

저는 Kafka에서 스트림하는 Spark 소비자가 있습니다. 정확히 한 번 의미에 대한 오프셋을 관리하려고합니다. 그러나RDD에서 KafkaOffset에 액세스하는 중 예외가 발생했습니다.

상기는 다음과 같은 예외가 발생하는 동안 액세스 오프셋 :

"java.lang.ClassCastException가 : org.apache.spark.rdd.MapPartitionsRDD를 가 org.apache.spark.streaming 캐스트 할 수없는 .kafka.HasOffsetRanges "

이 아래와 같다 수행하는 코드의 부분 : 여기

var offsetRanges = Array[OffsetRange]() 
dataStream 
    .transform { 
    rdd => 
     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
     rdd 
    } 
    .foreachRDD(rdd => { }) 

dataStrea 누군가가 나를 내가 잘못 여기서 뭐하는 거지 이해하는 데 도움이 할 수있는 경우

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2) 

: m가 직접 스트림 (d 스트림 [문자열]) 같은 KafkaUtils의 API 뭔가를 사용하여 만들었습니다. 변형은 공식 문서에서 언급 된대로 데이터 스트림에서 수행되는 일련의 메서드에서 첫 번째 메서드입니다.

감사합니다.

답변

6

귀하의 문제는 다음과 같습니다 대신 KafkaUtils.createKafkaStream에 의해 생성 된 DirectKafkaInputDStreamMapPartitionedDStream를 생성

.map(._2) 

.

당신은 transformmap해야합니다

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+""+t)) 

kafkaStream 
    .transform { 
    rdd => 
     offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
     rdd 
    } 
    .map(_._2) 
    .foreachRDD(rdd => // stuff) 
+0

아를! 바보 나. 감사합니다. 친구. 건배! :) – taransaini43

+0

@ user1521672 당신을 환영합니다. –

+0

또한 오프셋을 사용하여 직접 스트림을 만들려고 시도하는 중 오류가 발생했습니다.
의 Val fromOffsets (TopicAndPartition 롱) = TopicAndPartition (metrics_rs.getString (1) metrics_rs.getInt (2)) -> metrics_rs.getLong (3)
KafkaUtils.createDirectStream [문자열, 문자열 StringDecoder, StringDecoder (문자열, 문자열) (SSC, kafkaParams, fromOffsets,에 messageHandler)
가 가 발에 messageHandler가 = (MMD : MessageAndMetadata [문자열, 문자열) => mmd.message.length 및 metrics_rs 결과 세트되는 오프셋 맵을 가져오고 있습니다. 너무 많은 형식 인수 오류가 발생합니다. – taransaini43

관련 문제