Kafka DirectStream을 사용하고 각 파티션의 RDD를 처리하고 DB에 처리 된 값을 쓰려고합니다. reduceByKey (파티션 당, 셔플없이)를 수행하려고하면 다음 오류가 발생합니다. 일반적으로 드라이버 노드에서 sc.parallelize (Iterator)를 사용하여이 문제를 해결할 수 있습니다. 하지만 스파크 스트리밍으로 해결하고 싶습니다.Spark Streaming - Iterator의 파티션에서 reduceByKey를 사용하는 방법
value reduceByKey is not a member of Iterator[((String, String), (Int, Int))]
파티션 내의 반복자에 대한 변환을 수행하는 방법이 있습니까?
myKafkaDS
.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
val commonIter = rdd.mapPartitionsWithIndex ((i,iter) => {
val offset = offsetRanges(i)
val records = iter.filter(item => {
(some_filter_condition)
}).map(r1 => {
// Some processing
((field2, field2), (field3, field4))
})
val records.reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) // Getting reduceByKey() is not a member of Iterator
// Code to write to DB
Iterator.empty // I just want to store the processed records in DB. So returning empty iterator
})
}
더 세련된 방법으로 각 파티션에 대한 kafka RDD를 처리하고 DB에 저장하십시오.
저장하려는 데이터베이스는 무엇입니까? 거기에 많은 데이터베이스에 사용할 수있는 스파크 DB 커넥터 API가 있습니다, 당신은 쉽게 데이터베이스에 RDD를 저장할 수 있습니다. – Shankar
@Shankar 내 관심사는 DB에 저장되지 않습니다. 그러나 동일한 카프카 오프셋에 속한 RDD를 처리하고 오프셋과 데이터를 저장하여 처리 된 오프셋을 추적 할 수 있습니다. – santhosh