우리 프로젝트에 Kafka를 사용하기 시작했습니다. 우리는 kafka_2.11-0.9.0.0을 사용하고 있습니다. KafkaConsumer와 관련된 몇 가지 질문이 있습니다.Kafka 소비자 투표 및 재 연결
1) Zookeeper 및 Kafka 서버를 시작하기 전에 Kafka Consumer를 시작했지만 여전히 KafkaConsumer 클라이언트가 연결할 수있었습니다. 나는 코드
Consumer<String, String> consumer = new KafkaConsumer<String,String>(props);
consumer.subscribe(getConsumerRegisteredTopics());
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (ConsumerRecord<String, String> record : records){
processRecord (record)
}
}
2)가 사육사는 설문 조사 (긴 시간 초과) 메서드 호출의 사용에 의해 활성 소비자의 트랙을 유지 읽기의 라인을 다음했다. zookeeper가 내 소비자를 어떻게 추적하는지 poll()에서 Long.MAX_VALUE가 시간 초과를 사용하면 어떻게됩니까? KafkaConsumer 설문 조사 전화의 행동을 이해하도록 도와주십시오.
미리 감사드립니다.
답변 해 주셔서 감사합니다. 설문 조사 방법에서 내 고객이 Long.MAX_VALUE를 기다리는 경우 하트 비트를 보내는 방법과 Kafka 서버/사육사가 내 소비자가 아직 살아 있음을 어떻게 알 수 있습니까? – user1874156
응용 프로그램 코드가 작동하는지 확인하기 위해 하트 비트가 있습니다. 프로그램의 제어 흐름이 폴링 메소드 내부에있는 한 하트 비트에 대해 신경 쓸 필요가 없습니다. –