저는 Kafka에서 스트림하는 Spark 소비자가 있습니다. 정확히 한 번 의미에 대한 오프셋을 관리하려고합니다. 그러나RDD에서 KafkaOffset에 액세스하는 중 예외가 발생했습니다.
상기는 다음과 같은 예외가 발생하는 동안 액세스 오프셋 :
"java.lang.ClassCastException가 : org.apache.spark.rdd.MapPartitionsRDD를 가 org.apache.spark.streaming 캐스트 할 수없는 .kafka.HasOffsetRanges "
이 아래와 같다 수행하는 코드의 부분 : 여기
var offsetRanges = Array[OffsetRange]()
dataStream
.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}
.foreachRDD(rdd => { })
dataStrea 누군가가 나를 내가 잘못 여기서 뭐하는 거지 이해하는 데 도움이 할 수있는 경우
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, Set(source_schema+"_"+t)).map(_._2)
: m가 직접 스트림 (d 스트림 [문자열]) 같은 KafkaUtils의 API 뭔가를 사용하여 만들었습니다. 변형은 공식 문서에서 언급 된대로 데이터 스트림에서 수행되는 일련의 메서드에서 첫 번째 메서드입니다.
감사합니다.
아를! 바보 나. 감사합니다. 친구. 건배! :) – taransaini43
@ user1521672 당신을 환영합니다. –
또한 오프셋을 사용하여 직접 스트림을 만들려고 시도하는 중 오류가 발생했습니다.
의 Val fromOffsets (TopicAndPartition 롱) = TopicAndPartition (metrics_rs.getString (1) metrics_rs.getInt (2)) -> metrics_rs.getLong (3)
KafkaUtils.createDirectStream [문자열, 문자열 StringDecoder, StringDecoder (문자열, 문자열) (SSC, kafkaParams, fromOffsets,에 messageHandler)
가 가 발에 messageHandler가 = (MMD : MessageAndMetadata [문자열, 문자열) => mmd.message.length 및 metrics_rs 결과 세트되는 오프셋 맵을 가져오고 있습니다. 너무 많은 형식 인수 오류가 발생합니다. – taransaini43