2016-06-29 2 views
0

Apache Kafka에 대기열 시스템을 구축했습니다. 응용 프로그램은 특정 Kafka topic에 대한 메시지를 생성하며 소비자 측에서는 해당 주제에 대해 생성 된 모든 레코드를 소비해야합니다.
새로운 Java Consumer API를 사용하여 소비자를 작성했습니다. 내가 어떤 기록이 즉시 소비 및 처리해야한다 생산자에 의해 카프카 주제로 밀어 있도록 영원히 소비자를 실행할 필요가 여기에 kafka 소비자 (새로운 소비자 API)를 영구적으로 실행 중

Properties props = new Properties(); 
        props.put("bootstrap.servers", kafkaBrokerIp+":9092"); 
        props.put("group.id",groupId); 
        props.put("enable.auto.commit", "true"); 
        props.put("session.timeout.ms", "30000"); 
        props.put("auto.offset.reset", "earliest"); 
     props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer(props); 
        consumer.subscribe(Arrays.asList("consumertest")); 
        while (true) { 
         ConsumerRecords<String, String> records = consumer.poll(100); 
         for (ConsumerRecord<String, String> record : records){ 
          System.out.println("Data recieved : "+record.value()); 
          } 
        } 

처럼 코드가 보인다.
내 혼란은 샘플 코드처럼 무한 while 루프를 사용하여 데이터를 소비하는 올바른 방법일까요?

답변

0

나를 위해 작동하지만 예외를 throw하는 경우 try/catch 블록에 내부 루프를 넣을 수 있습니다. 또한 연결을 끊으면 정기적 인 재 연결 작업을 고려하십시오.

0

예, 무한 루프를 사용할 수 있습니다. 실제로, 그것은 바쁜 루프가 아닙니다. 각 폴링 중 데이터를 사용할 수없는 경우 통화는 주어진 시간 동안 대기합니다.

long millisToWait = 100; 
consumer.poll(millisToWait); 

new-consumer는 네트워크 통신 문제를 자동으로 처리합니다. 종료시 소비자가 정상적으로 종료되는지 확인하십시오.

관련 문제