카프카 (Kafka) 고급 소비자입니다.카프카 소비자 상태 확인 방법
public class KafkaHighLevelConsumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
private final List<String> topics;
private final int id;
public KafkaHighLevelConsumer(int id,
String groupId,
List<String> topics,BlockingQueue<String> storyQueue) {
this.id = id;
this.topics = topics;
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9091");
props.put("group.id", groupId);
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<>(props);
}
@Override
public void run() {
try {
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
Map<String, Object> data = new HashMap<>();
data.put("partition", record.partition());
data.put("offset", record.offset());
data.put("value", record.value());
System.out.println(this.id + ": " + data);
}
}
} catch (WakeupException e) {
// ignore for shutdown
}finally {
consumer.close();
}
}
public void shutdown() {
consumer.wakeup();
}
}
소비자는 정상적으로 작동하지만 소비자 상태를 모니터링해야합니다. 서버 ip 또는 포트가 올바르지 않은 경우 예외가 발생하지 않는 이유는 무엇입니까?
포트를 일부 잘못 변경하면 props.put("bootstrap.servers", "localhost:9091");
에서 props.put("bootstrap.servers", "localhost:100500");
까지 예외가 발생하지 않습니다.
카프카에 성공적으로 연결되었는지 여부를 알고 싶습니다. 그러한 사건을 처리 할 수 있습니까?
나는 그런 메이븐
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.9.0.1</version>
</dependency>
감사를 deps 사용!
kafka-consumer-groups.sh를 사용하여 주제 소비자와 그 상태를 볼 수 있습니다. – MGolovanov
클라이언트는 브로커가 다운되었을 것으로 가정하기 때문에 예외는 없습니다. 클라이언트가 온라인 상태가되면 브로커에 연결합니다. –