2016-07-26 2 views
2

우리 프로젝트에 Kafka를 사용하기 시작했습니다. 우리는 kafka_2.11-0.9.0.0을 사용하고 있습니다. KafkaConsumer와 관련된 몇 가지 질문이 있습니다.Kafka 소비자 투표 및 재 연결

1) Zookeeper 및 Kafka 서버를 시작하기 전에 Kafka Consumer를 시작했지만 여전히 KafkaConsumer 클라이언트가 연결할 수있었습니다. 나는 코드

Consumer<String, String> consumer = new KafkaConsumer<String,String>(props); 
    consumer.subscribe(getConsumerRegisteredTopics()); 
    while (true) { 
     ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE); 
     for (ConsumerRecord<String, String> record : records){ 
      processRecord (record) 
     } 
    } 

2)가 사육사는 설문 조사 (긴 시간 초과) 메서드 호출의 사용에 의해 활성 소비자의 트랙을 유지 읽기의 라인을 다음했다. zookeeper가 내 소비자를 어떻게 추적하는지 poll()에서 Long.MAX_VALUE가 시간 초과를 사용하면 어떻게됩니까? KafkaConsumer 설문 조사 전화의 행동을 이해하도록 도와주십시오.

미리 감사드립니다.

답변

1

1) 소비자를 시작하기 전에 동물원과 카프카를 시작하지 않으면 연결할 수 없지만 카프카에서 메타 데이터를 읽으려고합니다. 내 경험에 따르면 KafkaConsumer의 '설문 조사'통화는 메타 데이터를 연결하고 읽을 수있을 때까지 미정으로 차단됩니다. 즉 ... 귀하의 소비자는 실제로 연결되어 있지 않지만 카프카 클러스터가 나타날 때까지 기다리고 있습니다.

2) 폴링 시간 초과는 소비자가 데이터를 반환 할 수있을 때까지 기다리는 시간을 알려줍니다. 설문 조사가 반환 된 후 곧 소비자가 계속 설문 조사를 실시 할 수 있도록해야합니다. 설문 조사 통화에 지정된 시간 제한은 KafkaConsumer의 Keepalive 메커니즘과 관련이 없습니다 (이는 사용자의 소비자 속성에 대한 session.timeout.ms 속성에 의해 제어 됨).

+0

답변 해 주셔서 감사합니다. 설문 조사 방법에서 내 고객이 Long.MAX_VALUE를 기다리는 경우 하트 비트를 보내는 방법과 Kafka 서버/사육사가 내 소비자가 아직 살아 있음을 어떻게 알 수 있습니까? – user1874156

+0

응용 프로그램 코드가 작동하는지 확인하기 위해 하트 비트가 있습니다. 프로그램의 제어 흐름이 폴링 메소드 내부에있는 한 하트 비트에 대해 신경 쓸 필요가 없습니다. –

관련 문제