2017-02-17 4 views
1

카프카 0.10.1.1과 폭풍우 1.0.2를 사용하고 있습니다. kafka 통합을위한 폭풍 문서에서 나는 zookeeper 서버를 사용하여 kafka 스파우트를 초기화 할 때 오프셋이 여전히 사육사를 사용하여 유지되고 있음을 볼 수 있습니다. 어떻게 카푸카 서버를 사용하여 스파우트를 부트 스트랩 할 수 있습니까? 이에 대한 예제가 있습니다. 사육사를 사용하여 폭풍 문서Kafka 스파우트 통합

BrokerHosts hosts = new ZkHosts(zkConnString); 
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString()); 
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig); 

이 옵션에서 예는 잘 작동하고 메시지를 소비한다. 하지만 나는 kafkamanager ui에서 소비자 그룹 또는 폭풍우 노드를 소비자로 볼 수 없었습니다.

대체 방법이 시도되었습니다.

KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig(); 

KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig); 

private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() { 

     Map<String, Object> props = new HashMap<>(); 
     props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, bootstrapServers); 
     props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID); 
     props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, 
       "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, 
       "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true"); 

     String[] topics = new String[1]; 
     topics[0] = topicName; 

     KafkaSpoutStreams kafkaSpoutStreams = 
       new KafkaSpoutStreamsNamedTopics.Builder(new Fields("message"), topics).build(); 

     KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = 
       new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new TuplesBuilder(topicName)).build(); 

     KafkaSpoutConfig<String, String> spoutConf = 
       new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, tuplesBuilder).build(); 

     return spoutConf; 
    } 

그러나이 솔루션은 kafka에서 몇 개의 메시지를 읽은 후 CommitFailedException을 표시합니다.

답변

0

Storm-kafka는 일반 kafka 클라이언트와 함께 사육사에서 다른 위치와 다른 형식으로 소비자 정보를 작성합니다. 그래서 당신은 kafkamanager ui에서 그것을 볼 수 없습니다.

https://github.com/keenlabs/capillary과 같은 다른 모니터 도구를 찾을 수 있습니다.

관련 문제