카프카 0.10.1.1과 폭풍우 1.0.2를 사용하고 있습니다. kafka 통합을위한 폭풍 문서에서 나는 zookeeper 서버를 사용하여 kafka 스파우트를 초기화 할 때 오프셋이 여전히 사육사를 사용하여 유지되고 있음을 볼 수 있습니다. 어떻게 카푸카 서버를 사용하여 스파우트를 부트 스트랩 할 수 있습니까? 이에 대한 예제가 있습니다. 사육사를 사용하여 폭풍 문서Kafka 스파우트 통합
BrokerHosts hosts = new ZkHosts(zkConnString);
SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName, UUID.randomUUID().toString());
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
이 옵션에서 예는 잘 작동하고 메시지를 소비한다. 하지만 나는 kafkamanager ui에서 소비자 그룹 또는 폭풍우 노드를 소비자로 볼 수 없었습니다.
대체 방법이 시도되었습니다.
KafkaSpoutConfig<String, String> kafkaSpoutConfig = newKafkaSpoutConfig();
KafkaSpout<String, String> spout = new KafkaSpout<>(kafkaSpoutConfig);
private static KafkaSpoutConfig<String, String> newKafkaSpoutConfig() {
Map<String, Object> props = new HashMap<>();
props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, bootstrapServers);
props.put(KafkaSpoutConfig.Consumer.GROUP_ID, GROUP_ID);
props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER,
"org.apache.kafka.common.serialization.StringDeserializer");
props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
String[] topics = new String[1];
topics[0] = topicName;
KafkaSpoutStreams kafkaSpoutStreams =
new KafkaSpoutStreamsNamedTopics.Builder(new Fields("message"), topics).build();
KafkaSpoutTuplesBuilder<String, String> tuplesBuilder =
new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(new TuplesBuilder(topicName)).build();
KafkaSpoutConfig<String, String> spoutConf =
new KafkaSpoutConfig.Builder<>(props, kafkaSpoutStreams, tuplesBuilder).build();
return spoutConf;
}
그러나이 솔루션은 kafka에서 몇 개의 메시지를 읽은 후 CommitFailedException을 표시합니다.