2016-09-12 3 views
3

정확하게 한 번 의미론을 위해 kafka 오프셋을 관리하려고합니다. 다음과 같이 오프셋지도를 사용하여 직접 스트림을 생성하는 동안KafkaUtils API | 오프셋 관리 | Spark Streaming

문제를 직면 : 여기

val fromOffsets : (TopicAndPartition, Long) = TopicAndPartition(metrics_rs.getString(1), metrics_rs.getInt(2)) -> metrics_rs.getLong(3) 

KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder,(String, String)] (ssc,kafkaParams,fromOffsets,messageHandler) 

val messageHandler = 
     (mmd: MessageAndMetadata[String, String]) => mmd.message.length 

그리고

metrics_rs = metricsStatement.executeQuery("SELECT part,off from metrics.txn_offsets where topic='"+t+''') 

, 나는 내가 뭔가 잘못하고있는 중이 야 생각 선언 스타일 ... 당신이 도울 수 있다면. 컴파일 오류에 "createDirectStream에 대한 형식 인수가 너무 많습니다."

+0

당신이 알고 최신 카프카 0.10 + 호환'KafkaUtils의이다. createDirectStream'? 5 형 0.8 호환 인터페이스를 왜 사용하는지 궁금합니다. –

답변

1

내가 잘못한 것 몇 가지가 있습니다.

Map[TopicAndPartition, Long]을 전달해야하며 현재는 Tuple2[TopicAndPartition, Long]입니다. 그래서 당신이 필요합니다

val fromOffsets: Map[TopicAndPartition, Long] = 
    Map(TopicAndPartition(metrics_rs.getString(1), 
          metrics_rs.getInt(2)) -> metrics_rs.getLong(3)) 

당신은 createDirectStream에서 수익 유형이 유형 (String, String)의 튜플이다, 아직 messageHandler 값이 Int 말한다. 당신은 키 값 쌍 튜플을 반환하려면, 다음이 필요합니다

val messageHandler: MessageAndMetadata[String, String] => (String, String) = 
    (mmd: MessageAndMetadata[String, String]) => (mmd.key(), mmd.message()) 

이 컴파일해야한다는 고정 후 :

val stream = KafkaUtils 
       .createDirectStream[String, String, 
         StringDecoder, StringDecoder, 
         (String, String)] (ssc, 
             kafkaParams, 
             fromOffsets, 
             messageHandler) 
+0

예, 튜플을 Map 및 messageHandler로 변환했지만 여전히 컴파일되지 않습니다. – taransaini43

+0

'kafkaParams'는'Set [String]'입니까? > KafkaConfig.zookeeperHost, "group.id"- - "KafkaConfig.groupId, "auto.commit –

+0

그것은지도 ... 브로 kafkaParams = 맵 [문자열, 문자열 ( "zookeeper.connect"이다. 활성화 "-> KafkaConfig.autoCommitEnabled, "auto.commit.interval.ms "-> KafkaConfig.autoCommitInterval, "bootstrap.servers "-> KafkaConfig.bootstrapServers ....) – taransaini43

관련 문제