2017-04-22 3 views
0

카프카에 새로 입문했습니다. 카프카에서 일하기 시작했습니다. 아래 문제에 직면하고 있습니다. 문제를 해결할 수 있도록 도와주세요. 미리 감사드립니다. 먼저 제작자 API를 작성 중이지만 잘 작동하고 있지만 Consumer API 메시지는 표시되지 않습니다.Kafka 소비자 API가 제대로 작동하지 않습니다.

내 코드는 다음과 같다 :

import java.util.Arrays; 
import java.util.Properties; 

import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 


public class ConsumerGroup { 
    public static void main(String[] args) throws Exception { 

     String topic = "Hello-Kafka"; 
     String group = "myGroup"; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "XXX.XX.XX.XX:9092"); 
     props.put("group.id", group); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
     try { 

      consumer.subscribe(Arrays.asList(topic)); 
      System.out.println("Subscribed to topic " + topic); 


      ConsumerRecords<String, String> records = consumer.poll(100); 

      System.out.println("records ::" + records); 
      System.out.println(records.toString()); 
      for (ConsumerRecord<String, String> record : records) { 
       System.out.println("Record::" + record.offset()); 
       System.out.println(record.key()); 
       System.out.println(record.value()); 
      } 
      consumer.commitSync(); 

     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      consumer.commitSync(); 
      consumer.close(); 
     } 
    } 
} 

응답 :

주제 안녕하세요 - 카프카에 가입 한 기록 :: [email protected] 조직. [email protected]

여기

오프셋, 키를 인쇄하지, 값 제어를위한에 오지 않아 (ConsumerRecord 기록 : 기록) {그 루프는 나를 도와주세요.

+0

주제에 대한 메시지를 작성하셨습니까? 당신의 주제와 같은 메세지가 없습니다. – divyesh

답변

0

빈 레코드를 인쇄하려고하므로 records.toString() 만 코드에 인쇄됩니다.이 코드는 본질적으로 클래스 이름입니다.
코드에 약간의 변경을 가하고 작동 시켰습니다. 이것이 도움이되는지보세요.

public class ConsumerGroup { 
    public static void main(String[] args) throws Exception { 

     String topic = "Hello-Kafka"; 
     String group = "myGroup"; 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "xx.xx.xx.xx:9092"); 
     props.put("group.id", group); 
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
     KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); 
     try { 

      consumer.subscribe(Arrays.asList(topic)); 
      System.out.println("Subscribed to topic " + topic); 

      while(true){ 
       ConsumerRecords<String, String> records = consumer.poll(1000); 
       if(records.isEmpty()){ 

       } 
       else{ 
       System.out.println("records ::" + records); 
       System.out.println(records.toString()); 
       for (ConsumerRecord<String, String> record : records) { 
        System.out.println("Record::" + record.offset()); 
        System.out.println(record.key()); 
        System.out.println(record.value()); 
       } 
       consumer.commitSync(); 
       } 
      } 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } finally { 
      consumer.commitSync(); 
      consumer.close(); 
     } 
    } 
} 
+0

많은 감사합니다. 일하고 있습니다. 이전에 나는 작은 실수를 저질 렀습니다. – Narasimha

관련 문제