이미 실행중인 다른 Kafka를 구독하기 위해 Apache Kafka 소비자를 구축 중입니다. 내 문제는 제 제작자가 서버에 메시지를 푸시 할 때 ... 소비자가 메시지를받지 못한다는 것입니다. 내가 링크에서 모든 정보를 얻을 수소비자가 Apache Kafka에서 메시지를 수신하지 않음
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.10.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.1.0</version>
</dependency>
: 나는 프로듀서 코드를 제공 여기 , 여기
Properties properties = new Properties();
properties.put("metadata.broker.list","Running kafka ip addr:9092");
properties.put("serializer.class","kafka.serializer.StringEncoder");
ProducerConfig producerConfig = new ProducerConfig(properties);
kafka.javaapi.producer.Producer<String,String> producer = new kafka.javaapi.producer.Producer<String, String>(producerConfig);
String filePath="filepath";
File rootFile= new File(filePath);
Collection<File> allFiles = FileUtils.listFiles(rootFile, CanReadFileFilter.CAN_READ, TrueFileFilter.INSTANCE);
for(File file : allFiles) {
StringBuilder sb = new StringBuilder();
sb.append(file);
KeyedMessage<String, String> message =new KeyedMessage<String, String>(TOPIC,sb.toString());
System.out.println("sending msg from producer.."+sb.toString());
producer.send(message);
}
producer.close();
소비자 코드,
properties.put("bootstrap.servers","Running zookeaper ip addr:2181");
properties.put("group.id","test-group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.println("topic = "+record.topic());
System.out.println("topic = "+record.partition());
System.out.println("topic = "+record.offset());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
System.out.printf("commit failed", e) ;
}
}
나는이 종속성을 사용합니다 :
우리는 소비자를 실행하는 경우 https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
, 우리는 소비자 측면에서 어떤 통지를하지 않았다. 제발 생각 좀 해줘. 생산자에 대한
어디서 문제인지 파악하려고합니다. 소비자 또는 제작자 크기. 이를 위해 : 주제의 오프셋을 확인하십시오. 명령 줄 – Natalia
에서 수행 할 수 있습니다. 클러스터의 jar 파일로 실행하고 있습니까? .. 귀하의 사육사 포트를 확인하십시오. –
@Natalia : 제작자를 통해 메시지를 게시 할 수 있습니다. 로그 크기와 함께 증가하는 메시지 번호를 볼 수 있습니다. 그러나 오프셋이 증가하지 않습니다 ... –