2016-09-01 3 views
0

나는 Kafka에서 실행중인 스파크 스트리밍 작업을 실행 중입니다. 나는이 같은 메시지를 얻을 :Spark Streaming GroupBy 처리 할 튜플의 부분

val messageStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, Int, Long, String)](ssc, getKafkaBrokers(), getKafkaTopics("raw"), (mmd: MessageAndMetadata[String, String]) => { 
    (mmd.topic, mmd.partition, mmd.offset, mmd.message) 
}) 

을 이제 내가 원하는에 내가 하나 개의 일괄 처리 할 수있는 동일한 주제/파티션과 모든 있도록 주제와 파티션에 의해 그룹에 데이터를 가지고있다. 여기에 사용할 올바른 기능은 무엇입니까

messageStream.foreachRDD(x => x.? 

그룹입니까? 그것이 그룹이라면 내가 가지고있는 튜플의 처음 두 부분으로 어떻게 묶을까요? KafkaRDD [0]에는 여러 개의 메시지가있을 것이므로 각각을 처리 할 수있는 메시지와 같은 메시지 집합으로 그룹화하려고합니다 청크 대 개별 메시지로 그룹화합니다.

편집 : 그래서 아래의 피드를 기반으로 다시 그래서 나는 이런 식으로 뭔가를 할 것이다 :

messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => { 
     ? 
    })) 

은 K에서 지금가요, K와 같은 V가 (주제, 파티션), 값입니다 (오프셋 (offset) , 주제)? 튜플의 첫 번째와 두 번째 부분이 필요합니다. API 호출을 통해 메시지 처리 방법에 대한 지침을 얻을 수 있기 때문입니다. 내가 원하는 것은 각 메시지에서 주제/파티션을 기반으로 한 동일한 명령어 세트를 가지고 있기 때문에 각 메시지에서 API를 개별적으로 호출하는 것입니다.

K : (주제, 파티션) V : CompactBuffer ((주제, 파티션, 오프셋, 메시지),()) 등

messageStream.foreachRDD(x => x.groupBy(x => (x._1, x._2)).foreach(x => { 
      val topic = x._1_.1 
      val partition = x._1._2 
      x._2.forEach(x=> ... 
     })) 

답변

1
가 지금처럼 온다 실현 : 편집

to group 튜플의 처음 두 부분을 통해 다음을 시도 할 수 있습니다.

messageStream groupBy (x => (x._1, x._2)) 
+0

내가 작성한 답변을 편집했습니다. – theMadKing

+0

값은 여전히 ​​4- 튜플입니다. 3 번째와 4 번째 항목 만 있으면'mapValues ​​(v => (v._3, v._4))'해야합니다. – ryan

+0

튜플의 첫 번째와 두 번째 부분은 API 호출을 만들어 메시지 처리 방법에 대한 지침을 얻을 수 있기 때문에 필요합니다. 내 자신의 상쇄 관리자가 있기 때문에 제 3 부분이 필요합니다. 내가 원하는 것은 각 메시지에서 주제/파티션을 기반으로 한 동일한 명령어 세트를 가지고 있기 때문에 각 메시지에서 API를 개별적으로 호출하는 것입니다. – theMadKing

관련 문제