카프카에 새로 입문했습니다. 카프카에서 일하기 시작했습니다. 아래 문제에 직면하고 있습니다. 문제를 해결할 수 있도록 도와주세요. 미리 감사드립니다. 먼저 제작자 API를 작성 중이지만 잘 작동하고 있지만 Consumer API 메시지는 표시되지 않습니다.Kafka 소비자 API가 제대로 작동하지 않습니다.
내 코드는 다음과 같다 :
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ConsumerGroup {
public static void main(String[] args) throws Exception {
String topic = "Hello-Kafka";
String group = "myGroup";
Properties props = new Properties();
props.put("bootstrap.servers", "XXX.XX.XX.XX:9092");
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
try {
consumer.subscribe(Arrays.asList(topic));
System.out.println("Subscribed to topic " + topic);
ConsumerRecords<String, String> records = consumer.poll(100);
System.out.println("records ::" + records);
System.out.println(records.toString());
for (ConsumerRecord<String, String> record : records) {
System.out.println("Record::" + record.offset());
System.out.println(record.key());
System.out.println(record.value());
}
consumer.commitSync();
} catch (Exception e) {
e.printStackTrace();
} finally {
consumer.commitSync();
consumer.close();
}
}
}
응답 :
주제 안녕하세요 - 카프카에 가입 한 기록 :: [email protected] 조직. [email protected]
여기오프셋, 키를 인쇄하지, 값 제어를위한에 오지 않아 (ConsumerRecord 기록 : 기록) {그 루프는 나를 도와주세요.
주제에 대한 메시지를 작성하셨습니까? 당신의 주제와 같은 메세지가 없습니다. – divyesh