2016-06-23 2 views
0

카프카 위치 설치에 대한 명령 줄에서 메시지를 보내고받을 수 있습니다. Java 코드를 통해 메시지를 보낼 수도 있습니다. 그리고 그 메시지는 Kafka 명령 프롬프트에 표시됩니다. 또한 Kafka 소비자를위한 Java 코드도 있습니다. 어제 코드가 메시지를 받았습니다. 그러나 오늘 아침에 어떤 메시지도받지 못합니다. 코드는 변경되지 않았습니다. 속성 설정이 옳지 않은지 궁금합니다.카프카 소비자 - 일관성없이 메시지 수신

생산자 :

bootstrap.servers - localhost:9092 
group.id - test 
key.serializer - StringSerializer.class.getName() 
value.serializer - StringSerializer.class.getName() 

과 ProducerRecord이

ProducerRecord<String, String>("test", "mykey", "myvalue") 

소비자로 설정됩니다

zookeeper.connect - "localhost:2181" 
group.id - "test" 
zookeeper.session.timeout.ms - 500 
zookeeper.sync.time.ms - 250 
auto.commit.interval.ms - 1000 
key.deserializer - org.apache.kafka.common.serialization.StringDeserializer 
value.deserializer - org.apache.kafka.common.serialization.StringDeserializer 

및 Java 코드 :

여기 내 구성입니다
Map<String, Integer> topicCount = new HashMap<>(); 
    topicCount.put("test", 1); 

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer 
      .createMessageStreams(topicCount); 
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic); 

무엇이 누락 되었습니까?

답변

0

여러 가지 일이 발생할 수 있습니다.

먼저 소비자의 ZooKeeper 세션 시간 초과가 매우 낮습니다. 이는 가비지 수집 일시 중지로 인해 많은 "소프트 오류"가 발생할 수 있음을 의미합니다. 이러한 상황이 발생하면 소비자 그룹이 다시 균형을 맞춰 소비를 일시 중지 할 수 있습니다. 그리고 이런 일이 자주 일어나면 소비자는 끊임없이 재조정되기 때문에 메시지를 소비하지 않는 상태가 될 수 있습니다. ZooKeeper 세션 시간 제한을 30 초로 늘려이 문제가 해결되는지 확인하는 것이 좋습니다. 그렇다면 실험을 낮추어 실험 할 수 있습니다.

둘째, 새 메시지가 "테스트"항목으로 생성되는지 확인할 수 있습니까? 소비자는 아직 커밋하지 않은 새 메시지 만 소비합니다. 주제에 새로운 메시지가 없을 수도 있습니다.

셋째, 동일한 소비자 그룹에있는 다른 소비자가 메시지를 처리 ​​할 수 ​​있습니까? 한 명의 소비자가 자주 오류가 발생하면 다른 사용자에게 해당 파티션이 할당됩니다.

마지막으로, 결국 제거 될 "오래된"소비자를 사용하고 있습니다. 가능하다면 Kafka 0.9에서 사용 가능한 "새로운"소비자 (KafkaConsumer.java)로 이동하는 것이 좋습니다. 약속 할 수는 없지만 문제가 해결 될 것입니다.

희망이 도움이됩니다.

관련 문제