2016-10-07 3 views
0

Kafka DirectStream을 사용하고 각 파티션의 RDD를 처리하고 DB에 처리 된 값을 쓰려고합니다. reduceByKey (파티션 당, 셔플없이)를 수행하려고하면 다음 오류가 발생합니다. 일반적으로 드라이버 노드에서 sc.parallelize (Iterator)를 사용하여이 문제를 해결할 수 있습니다. 하지만 스파크 스트리밍으로 해결하고 싶습니다.Spark Streaming - Iterator의 파티션에서 reduceByKey를 사용하는 방법

value reduceByKey is not a member of Iterator[((String, String), (Int, Int))] 

파티션 내의 반복자에 대한 변환을 수행하는 방법이 있습니까?

myKafkaDS 
    .foreachRDD { rdd => 
    val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges 
    val commonIter = rdd.mapPartitionsWithIndex ((i,iter) => { 

     val offset = offsetRanges(i) 

     val records = iter.filter(item => { 
     (some_filter_condition) 
     }).map(r1 => { 
     // Some processing 
     ((field2, field2), (field3, field4)) 
     }) 

     val records.reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) // Getting reduceByKey() is not a member of Iterator 
     // Code to write to DB  
     Iterator.empty // I just want to store the processed records in DB. So returning empty iterator 
    }) 
} 

더 세련된 방법으로 각 파티션에 대한 kafka RDD를 처리하고 DB에 저장하십시오.

+0

저장하려는 데이터베이스는 무엇입니까? 거기에 많은 데이터베이스에 사용할 수있는 스파크 DB 커넥터 API가 있습니다, 당신은 쉽게 데이터베이스에 RDD를 저장할 수 있습니다. – Shankar

+0

@Shankar 내 관심사는 DB에 저장되지 않습니다. 그러나 동일한 카프카 오프셋에 속한 RDD를 처리하고 오프셋과 데이터를 저장하여 처리 된 오프셋을 추적 할 수 있습니다. – santhosh

답변

0

당신의 레코드 값은 반복자이고 RDD가 아닙니다. 따라서 레코드 관계에서 reduceByKey를 호출 할 수 없습니다.

0

구문 문제 : 오타하지 않을 경우

1) reduceByKey 로직이 좋아 보이는

이, (문 앞에 발을 제거하시기 바랍니다) &지도 후 reduceByKey()를 첨부 :

.map(r1 => { 
    // Some processing 
    ((field2, field2), (field3, field4)) 
    }).reduceByKey((a,b) => (a._1+b._1, a._2+b._2)) 

2) 후 iter.next 추가 각 반복의 끝.

3) iter.empty가 잘못 배치되었습니다.

val commonIter = rdd.mapPartitionsWithIndex ((i,iter) => if (i == 0 && iter.hasNext){ 
.... 
}else iter),true) 
0

을 그래서 ... 우리는 mapPartitionsWithIndex 내에서 스파크 변환을 사용할 수 없습니다 mapPartitionsWithIndex 나온 후 넣어()

4 ) 안전을 위해 반복자 조건을 추가합니다. 그러나 스칼라 변환을 사용하고 groupby와 같은 메소드를 줄이면이 ​​문제를 해결할 수있었습니다.

관련 문제