2016-06-29 3 views
0

카프카 (Kafka) 고급 소비자입니다.카프카 소비자 상태 확인 방법

public class KafkaHighLevelConsumer implements Runnable { 
    private final KafkaConsumer<String, String> consumer; 
    private final List<String> topics; 
    private final int id; 

    public KafkaHighLevelConsumer(int id, 
         String groupId, 
         List<String> topics,BlockingQueue<String> storyQueue) { 
     this.id = id; 
     this.topics = topics; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9091"); 
     props.put("group.id", groupId); 
     props.put("key.deserializer", StringDeserializer.class.getName()); 
     props.put("value.deserializer", StringDeserializer.class.getName()); 
     this.consumer = new KafkaConsumer<>(props); 
    } 

    @Override 
    public void run() { 
     try { 
      consumer.subscribe(topics); 

      while (true) { 
       ConsumerRecords<String, String> records = consumer.poll(100); 
       for (ConsumerRecord<String, String> record : records) { 
        Map<String, Object> data = new HashMap<>(); 
        data.put("partition", record.partition()); 
        data.put("offset", record.offset()); 
        data.put("value", record.value()); 
        System.out.println(this.id + ": " + data); 
       } 
      } 
     } catch (WakeupException e) { 
      // ignore for shutdown 
     }finally { 
      consumer.close(); 
     } 
    } 

    public void shutdown() { 
     consumer.wakeup(); 
    } 
} 

소비자는 정상적으로 작동하지만 소비자 상태를 모니터링해야합니다. 서버 ip 또는 포트가 올바르지 않은 경우 예외가 발생하지 않는 이유는 무엇입니까?

포트를 일부 잘못 변경하면 props.put("bootstrap.servers", "localhost:9091");에서 props.put("bootstrap.servers", "localhost:100500");까지 예외가 발생하지 않습니다.

카프카에 성공적으로 연결되었는지 여부를 알고 싶습니다. 그러한 사건을 처리 할 수 ​​있습니까?

나는 그런 메이븐

<dependency> 
      <groupId>org.apache.kafka</groupId> 
      <artifactId>kafka-clients</artifactId> 
      <version>0.9.0.1</version> 
     </dependency> 

감사를 deps 사용!

+0

kafka-consumer-groups.sh를 사용하여 주제 소비자와 그 상태를 볼 수 있습니다. – MGolovanov

+0

클라이언트는 브로커가 다운되었을 것으로 가정하기 때문에 예외는 없습니다. 클라이언트가 온라인 상태가되면 브로커에 연결합니다. –

답변

0

클라이언트 문서 (https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html)에 따르면

그것은 투명하게 카프카 클러스터에

도서관 임시 실패의 차이를 말할 수를 서버의 장애를 처리 할 영구 실패 또는 잘못된 구성. 따라서 모든 실패가 "재 시도 가능"하다고 간주하고 정확하게 수행하며 오류를 반환하지 않고 영원히 재 시도합니다. 반환 보장되는 몇 가지 정보를 요청 몇 가지 적절한 시간을 기다리고 아무것도 후 반환하는 경우 연결이 일부 문제가 있음을 알고 :

는 연결의 상태를 확인 할 수있는 해결 방법입니다

int CONNECTION_TEST_TIMEOUT_SECONDS = 10; // or whatever is appropriate for your environment 

ExecutorService executor = Executors.newSingleThreadExecutor(); 
Runnable testTask = consumer::listTopics; 

Future future = executor.submit(testTask); 
try { 
    future.get(CONNECTION_TEST_TIMEOUT_SECONDS, TimeUnit.SECONDS); 
} catch (TimeoutException te) { 
    consumer.wakeup(); 
    throw new IOException("Could not communicate with the server within " + CONNECTION_TEST_TIMEOUT_SECONDS + " seconds"); 
} catch (InterruptedException e) { 
    // Nothing to do. Maybe a warning in the log? 
} catch (ExecutionException e) { 
    throw new IOException("Exception while running connection test: " + e.getMessage(), e); 
} 
관련 문제