3

이미 실행중인 다른 Kafka를 구독하기 위해 Apache Kafka 소비자를 구축 중입니다. 내 문제는 제 제작자가 서버에 메시지를 푸시 할 때 ... 소비자가 메시지를받지 못한다는 것입니다. 내가 링크에서 모든 정보를 얻을 수소비자가 Apache Kafka에서 메시지를 수신하지 않음

<dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka_2.11</artifactId> 
     <version>0.10.0.0</version> 
    </dependency> 
    <dependency> 
     <groupId>org.apache.kafka</groupId> 
     <artifactId>kafka-clients</artifactId> 
     <version>0.10.1.0</version> 
    </dependency> 

: 나는 프로듀서 코드를 제공 여기 , 여기

 Properties properties = new Properties(); 
     properties.put("metadata.broker.list","Running kafka ip addr:9092"); 
     properties.put("serializer.class","kafka.serializer.StringEncoder"); 
     ProducerConfig producerConfig = new ProducerConfig(properties); 
     kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig); 
     String filePath="filepath"; 
     File rootFile= new File(filePath); 
     Collection<File> allFiles = FileUtils.listFiles(rootFile, CanReadFileFilter.CAN_READ, TrueFileFilter.INSTANCE); 
     for(File file : allFiles) { 
      StringBuilder sb = new StringBuilder(); 
      sb.append(file); 
      KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sb.toString()); 
      System.out.println("sending msg from producer.."+sb.toString()); 
      producer.send(message); 
     } 
      producer.close(); 

소비자 코드,

  properties.put("bootstrap.servers","Running zookeaper ip addr:2181"); 
     properties.put("group.id","test-group"); 
     properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     properties.put("enable.auto.commit", "false"); 

     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); 
     consumer.subscribe(Collections.singletonList(topicName)); 
     while (true) { 
       ConsumerRecords<String, String> records = consumer.poll(100); 
       for (ConsumerRecord<String, String> record : records) 
       { 
        System.out.println("topic = "+record.topic()); 
        System.out.println("topic = "+record.partition()); 
        System.out.println("topic = "+record.offset()); 
       } 
       try { 
        consumer.commitSync(); 
       } catch (CommitFailedException e) { 
        System.out.printf("commit failed", e) ; 
       } 
      } 

나는이 종속성을 사용합니다 :
우리는 소비자를 실행하는 경우 https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

, 우리는 소비자 측면에서 어떤 통지를하지 않았다. 제발 생각 좀 해줘. 생산자에 대한

+0

어디서 문제인지 파악하려고합니다. 소비자 또는 제작자 크기. 이를 위해 : 주제의 오프셋을 확인하십시오. 명령 줄 – Natalia

+0

에서 수행 할 수 있습니다. 클러스터의 jar 파일로 실행하고 있습니까? .. 귀하의 사육사 포트를 확인하십시오. –

+0

@Natalia : 제작자를 통해 메시지를 게시 할 수 있습니다. 로그 크기와 함께 증가하는 메시지 번호를 볼 수 있습니다. 그러나 오프셋이 증가하지 않습니다 ... –

답변

0

는 :

properties.put("metadata.broker.list","Running kafka ip addr:9092"); 

것 같아요,이 "bootstrap.servers"이어야합니다. 소비자

:

properties.put("bootstrap.servers","Running zookeaper ip addr:2181"); 

bootstrap.servers 브로커를 가리켜 야, ZK하지.

"문제"는 사용자가 브로커를 기다리지 만 지정된 호스트/포트에 브로커가 없으면 실패하지 않는다는 것입니다.

+0

우리는 동일한 IP에서 브로커와 사육사를 모두 실행합니다. 그것의 단일 노드 설치. 따라서 동일한 IP를 주었다. 나는 다른 vms에서 사육사와 중개인을 운영해야만 하는가 ?? –

+0

다른 서버에서 실행할 필요는 없지만 권장됩니다. 어쨌든, ZK와 중개인은 다른 포트를 사용하고,'2181'은 ZK 기본 포트입니다 - 그래서 브로커 포트를 가리 키도록하십시오 (기본값 :'9092') –

+0

@ Matthias J. Sax - 소비자 측의 모든 메시지 –

0

나는 카프카와 자바에서 newb 해요,하지만 난

  • 는 생산자가 실제로 다음 명령 /usr/bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic KumarTopic --from-beginning을 사용하여 주제에 기록되어 있는지 확인합니다 다음과 같은 방법을 제안하고자합니다.
  • 그렇다면 소비자 코드에 집중해야 할 것입니다. Confluent의 가이드는 꽤 도움이됩니다.
+0

@ Zigmaphi-Thnaks, 나는 이미 확인했다. 생산자가 주제를 완벽하게 작성하고 소비자가 실행 중이지만 여전히 메시지를받지 못함 –

관련 문제