2017-12-21 1 views
1

카프카를 처음 접했을 때 파일에서 레코드를 읽고 제작자를 통해 카프카 주제에 메시지를 보낼 수 있지만 소비자를 통해 동일한 주제를 사용할 수는 없습니다.kafka 소비자를 사용하여 kafka 주제를 읽을 수 없습니까?

참고 : 출력 지금 여기

package implementation; 
import java.io.BufferedReader; 
//import java.io.BufferedWriter; 
import java.io.File; 
import java.io.FileInputStream; 
import java.io.FileNotFoundException; 
//import java.io.FileOutputStream; 
import java.io.IOException; 
import java.io.InputStreamReader; 
import java.util.Arrays; 
//import java.io.OutputStreamWriter; 
import java.util.Properties; 
import java.util.Random; 
import java.util.UUID; 

import org.apache.kafka.clients.consumer.Consumer; 
import org.apache.kafka.clients.consumer.ConsumerRecord; 
import org.apache.kafka.clients.consumer.ConsumerRecords; 
import org.apache.kafka.clients.consumer.KafkaConsumer; 
import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.errors.WakeupException; 
import org.apache.log4j.BasicConfigurator; 

import kafka.producer.ProducerConfig; 

public class File_to_Kafka extends ProducerConfig{ 

    Properties configProperties; 
    public File_to_Kafka(Properties originalProps) { 
     super(originalProps); 
     configProperties = originalProps; 
     // TODO Auto-generated constructor stub 
    } 

    public String topicName = "temp"+Math.random(); 

    public String groupId = UUID.randomUUID().toString();   



     public void producerKafka(Properties configProperties) throws IOException 
     { 


      FileInputStream fis = new FileInputStream("/home/nick/Desktop/Database-Kafka-ElasticSearch/src/main/java/resources/properties.xml"); 

      configProperties.load(fis); 
      System.out.println(configProperties); 
      org.apache.kafka.clients.producer.Producer<String, String> producer = new KafkaProducer<String, String>(configProperties); 

      File f1 = new File("/home/niket/Desktop/sample-example.txt"); 
      FileInputStream fis1 = new FileInputStream(f1); 
      BufferedReader br1 = new BufferedReader(new InputStreamReader(fis1)); 
      String str = br1.readLine(); 
      //while(br1.readLine()!=null) 
      while(str != null) 
      { 
       ProducerRecord<String, String> rec = new ProducerRecord<String, String>(topicName, str); 
       producer.send(rec); 
       str = br1.readLine(); 
      } 

      br1.close(); 
      fis.close(); 
      fis1.close(); 
      producer.close(); 

     } 

     public void consumerKafka() throws InterruptedException 
     { 
      ConsumerThread consumerRunnable = new ConsumerThread(topicName, groupId); 
      consumerRunnable.start(); 
      Thread.sleep(100); 
      consumerRunnable.getKafkaConsumer().wakeup(); 
      System.out.println("Stopping consumer ....."); 
      consumerRunnable.join(); 
     } 

     private static class ConsumerThread extends Thread{ 

      private String topicName; 
      private String groupId; 
      private KafkaConsumer<String, String> kafkaConsumer; 


      public ConsumerThread(String topicName, String groupId2) { 
       super(); 
       this.topicName = topicName; 
       this.groupId = groupId2; 
      } 

      public void run() 
      { 
       Properties configProperties = new Properties(); 
       configProperties.put("bootstrap.servers","localhost:9092"); 
       configProperties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
       configProperties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); 
       configProperties.put("group.id", groupId); 
       configProperties.put("CLIENT_ID_CONFIG", "simple"); 

       //Figure out where to tart processing messages from 
       kafkaConsumer = new KafkaConsumer<String, String>(configProperties); 
       kafkaConsumer.subscribe(Arrays.asList(topicName)); 
       int count=0; 



       //Start Processing Messages 
       try { 
         while(true) { 
          ConsumerRecords<String, String> records = kafkaConsumer.poll(100); 
          count = 0; 
          for (ConsumerRecord<String, String> record : records) 
          { 
            System.out.println(record.value()); 
            count++; 
          } 
          kafkaConsumer.commitAsync(); 
          if(count==records.count()) 
           break; 
         } 

        } 
       catch (WakeupException e) { 
        // TODO: handle exception 
        System.out.println("Exception caught : "+ e.getMessage()); 
       } 
       finally { 
        kafkaConsumer.close(); 
        System.out.println("After Closing KafkaConsumer"); 
       } 

      } 
      public KafkaConsumer<String,String> getKafkaConsumer(){ 
       return this.kafkaConsumer; 
       } 


     } 

     public static void main(String [] args) throws IOException, InterruptedException 
     { 
      BasicConfigurator.configure(); 
      Properties configProperties = new Properties(); 
      configProperties.put("bootstrap.servers", "localhost:9092"); 
      configProperties.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); 
      configProperties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
      configProperties.put("metadata.broker.list", "localhost:9092"); 
      File_to_Kafka obj = new File_to_Kafka(configProperties); 
      obj.producerKafka(configProperties); 
      obj.consumerKafka(); 

     } 

} 

됩니다 : 당신은 어떤 텍스트 파일에서 데이터를 읽을 수 있고, 내가 kafka_2.11-0.9.0.0 버전 여기

을 사용한 것은 내 코드입니다

0 [main] INFO kafka.utils.VerifiableProperties - Verifying properties 
61 [main] WARN kafka.utils.VerifiableProperties - Property bootstrap.servers is not valid 
62 [main] WARN kafka.utils.VerifiableProperties - Property key.serializer is not valid 
62 [main] INFO kafka.utils.VerifiableProperties - Property metadata.broker.list is overridden to localhost:9092 
62 [main] WARN kafka.utils.VerifiableProperties - Property value.serializer is not valid 
{<name>BOOTSTRAP_SERVERS_CONFIG=(bootstrap.servers)</name>, key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer, <value>org.apache.kafka.common.serialization.StringSerializer</value>=, <name>KEY_SERIALIZER_CLASS_CONFIG</name>=, metadata.broker.list=localhost:9092, <configuration>=, <?xml=version="1.0"?>, <name>KEY_DESERIALIZER_CLASS_CONFIG</name>=, <property>=, <value>org.apache.kafka.common.serialization.StringDeserializer</value>=, <value>localhost=9092</value>, bootstrap.servers=localhost:9092, <name>VALUE_DESERIALIZER_CLASS_CONFIG</name>=, <value>org.apache.kafka.common.serialization.ByteArraySerializer</value>=, </property>=, value.serializer=org.apache.kafka.common.serialization.StringSerializer, </configuration>=, <name>VALUE_SERIALIZER_CLASS_CONFIG</name>=} 
86 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
    compression.type = none 
    metric.reporters = [] 
    metadata.max.age.ms = 300000 
    metadata.fetch.timeout.ms = 60000 
    reconnect.backoff.ms = 50 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    bootstrap.servers = [localhost:9092] 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    buffer.memory = 33554432 
    timeout.ms = 30000 
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    ssl.keystore.type = JKS 
    ssl.trustmanager.algorithm = PKIX 
    block.on.buffer.full = false 
    ssl.key.password = null 
    max.block.ms = 60000 
    sasl.kerberos.min.time.before.relogin = 60000 
    connections.max.idle.ms = 540000 
    ssl.truststore.password = null 
    max.in.flight.requests.per.connection = 5 
    metrics.num.samples = 2 
    client.id = 
    ssl.endpoint.identification.algorithm = null 
    ssl.protocol = TLS 
    request.timeout.ms = 30000 
    ssl.provider = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    acks = 1 
    batch.size = 16384 
    ssl.keystore.location = null 
    receive.buffer.bytes = 32768 
    ssl.cipher.suites = null 
    ssl.truststore.type = JKS 
    security.protocol = PLAINTEXT 
    retries = 0 
    max.request.size = 1048576 
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer 
    ssl.truststore.location = null 
    ssl.keystore.password = null 
    ssl.keymanager.algorithm = SunX509 
    metrics.sample.window.ms = 30000 
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner 
    send.buffer.bytes = 131072 
    linger.ms = 0 

93 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bufferpool-wait-time 
96 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name buffer-exhausted-records 
99 [main] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-1, localhost, 9092)], partitions = []) 
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:client-id-producer-1 
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:client-id-producer-1 
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:client-id-producer-1 
116 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:client-id-producer-1 
117 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:client-id-producer-1 
117 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:client-id-producer-1 
118 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:client-id-producer-1 
122 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name batch-size 
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name compression-rate 
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name queue-time 
123 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name request-time 
124 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name produce-throttle-time 
124 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-per-request 
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-retries 
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name errors 
125 [main] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name record-size-max 
126 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Starting Kafka producer I/O thread. 
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>localhost = 9092</value> was supplied but isn't a known config. 
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration </configuration> = was supplied but isn't a known config. 
126 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <property> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>org.apache.kafka.common.serialization.StringDeserializer</value> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>VALUE_DESERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>org.apache.kafka.common.serialization.StringSerializer</value> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <configuration> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <value>org.apache.kafka.common.serialization.ByteArraySerializer</value> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>KEY_SERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>BOOTSTRAP_SERVERS_CONFIG = (bootstrap.servers)</name> was supplied but isn't a known config. 
127 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration </property> = was supplied but isn't a known config. 
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>VALUE_SERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config. 
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <name>KEY_DESERIALIZER_CLASS_CONFIG</name> = was supplied but isn't a known config. 
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration metadata.broker.list = localhost:9092 was supplied but isn't a known config. 
129 [main] WARN org.apache.kafka.clients.producer.ProducerConfig - The configuration <?xml = version="1.0"?> was supplied but isn't a known config. 
130 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0 
131 [main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a 
131 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - Kafka producer started 
199 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node -1 for sending metadata request 
199 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9092. 
254 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent 
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received 
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency 
255 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1 
267 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=0,client_id=producer-1}, body={topics=[temp0.8655521798253616]}), isInitiatedByNetworkClient, createdTimeMs=1513840470088, sendTimeMs=0) to node -1 
502 [kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - Error while fetching metadata with correlation id 0 : {temp0.8655521798253616=LEADER_NOT_AVAILABLE} 
502 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 2 to Cluster(nodes = [Node(0, niket-Lenovo-Y50-70, 9092)], partitions = []) 
502 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Adding node 0 to nodes ever seen 
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initialize connection to node 0 for sending metadata request 
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node 0 at niket-Lenovo-Y50-70:9092. 
599 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-sent 
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.bytes-received 
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node-0.latency 
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node 0 
600 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.NetworkClient - Sending metadata request ClientRequest(expectResponse=true, callback=null, request=RequestSend(header={api_key=3,api_version=0,correlation_id=1,client_id=producer-1}, body={topics=[temp0.8655521798253616]}), isInitiatedByNetworkClient, createdTimeMs=1513840470433, sendTimeMs=0) to node 0 
611 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 3 to Cluster(nodes = [Node(0, niket-Lenovo-Y50-70, 9092)], partitions = [Partition(topic = temp0.8655521798253616, partition = 0, leader = 0, replicas = [0,], isr = [0,]]) 
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.records-per-batch 
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.bytes 
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.compression-rate 
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.record-retries 
619 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name topic.temp0.8655521798253616.record-errors 
646 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. 
647 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Beginning shutdown of Kafka producer I/O thread, sending remaining records. 
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:client-id-producer-1 
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:client-id-producer-1 
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:client-id-producer-1 
667 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:client-id-producer-1 
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:client-id-producer-1 
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:client-id-producer-1 
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:client-id-producer-1 
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent 
668 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received 
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency 
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-sent 
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.bytes-received 
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node-0.latency 
669 [kafka-producer-network-thread | producer-1] DEBUG org.apache.kafka.clients.producer.internals.Sender - Shutdown of Kafka producer I/O thread has completed. 
669 [main] DEBUG org.apache.kafka.clients.producer.KafkaProducer - The Kafka producer has closed. 
674 [Thread-1] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
    metric.reporters = [] 
    metadata.max.age.ms = 300000 
    value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
    group.id = 2a4549ce-0e9d-4a66-9573-c5b4c47b3b34 
    partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor] 
    reconnect.backoff.ms = 50 
    sasl.kerberos.ticket.renew.window.factor = 0.8 
    max.partition.fetch.bytes = 1048576 
    bootstrap.servers = [localhost:9092] 
    retry.backoff.ms = 100 
    sasl.kerberos.kinit.cmd = /usr/bin/kinit 
    sasl.kerberos.service.name = null 
    sasl.kerberos.ticket.renew.jitter = 0.05 
    ssl.keystore.type = JKS 
    ssl.trustmanager.algorithm = PKIX 
    enable.auto.commit = true 
    ssl.key.password = null 
    fetch.max.wait.ms = 500 
    sasl.kerberos.min.time.before.relogin = 60000 
    connections.max.idle.ms = 540000 
    ssl.truststore.password = null 
    session.timeout.ms = 30000 
    metrics.num.samples = 2 
    client.id = 
    ssl.endpoint.identification.algorithm = null 
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer 
    ssl.protocol = TLS 
    check.crcs = true 
    request.timeout.ms = 40000 
    ssl.provider = null 
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1] 
    ssl.keystore.location = null 
    heartbeat.interval.ms = 3000 
    auto.commit.interval.ms = 5000 
    receive.buffer.bytes = 32768 
    ssl.cipher.suites = null 
    ssl.truststore.type = JKS 
    security.protocol = PLAINTEXT 
    ssl.truststore.location = null 
    ssl.keystore.password = null 
    ssl.keymanager.algorithm = SunX509 
    metrics.sample.window.ms = 30000 
    fetch.min.bytes = 1024 
    send.buffer.bytes = 131072 
    auto.offset.reset = latest 

675 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Starting the Kafka consumer 
675 [Thread-1] DEBUG org.apache.kafka.clients.Metadata - Updated cluster metadata version 1 to Cluster(nodes = [Node(-1, localhost, 9092)], partitions = []) 
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-closed:client-id-consumer-1 
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name connections-created:client-id-consumer-1 
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent-received:client-id-consumer-1 
675 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-sent:client-id-consumer-1 
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-received:client-id-consumer-1 
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name select-time:client-id-consumer-1 
676 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name io-time:client-id-consumer-1 
683 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name heartbeat-latency 
683 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name join-latency 
684 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name sync-latency 
685 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name commit-latency 
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name bytes-fetched 
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-fetched 
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-latency 
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name records-lag 
688 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name fetch-throttle-time 
688 [Thread-1] WARN org.apache.kafka.clients.consumer.ConsumerConfig - The configuration CLIENT_ID_CONFIG = simple was supplied but isn't a known config. 
689 [Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version : 0.9.0.0 
689 [Thread-1] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId : fc7243c2af4b2b4a 
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Kafka consumer created 
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - Subscribed to topic(s): temp0.8655521798253616 
689 [Thread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Issuing group metadata request to broker -1 
690 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - Initiating connection to node -1 at localhost:9092. 
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-sent 
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.bytes-received 
691 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Added sensor with name node--1.latency 
691 [Thread-1] DEBUG org.apache.kafka.clients.NetworkClient - Completed connection to node -1 
Stopping consumer ..... 
Exception caught : null 
772 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-closed:client-id-consumer-1 
772 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name connections-created:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent-received:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-received:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name bytes-sent:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name select-time:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name io-time:client-id-consumer-1 
773 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-sent 
774 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.bytes-received 
774 [Thread-1] DEBUG org.apache.kafka.common.metrics.Metrics - Removed sensor with name node--1.latency 
774 [Thread-1] DEBUG org.apache.kafka.clients.consumer.KafkaConsumer - The Kafka consumer has closed. 
After Closing KafkaConsumer 
+1

전체 출력이 강조 표시되었습니다. 사람들을 쉽게 이해할 수있게하십시오. –

답변

0

기본 방법에 문제가있는 것 같습니다. 흐름이 프로듀더에서 멈춘 것처럼 보입니다.

소비자를 별도의 메인 클래스로 시작할 수 있습니까? 소비자가 레코드를 가져 오는 것을 볼 수 있어야합니다.

+0

아직 카프카 주제에서 메시지를 소비 할 수 없습니다. – nick

+0

루프 동안 소비자의 일부 디버그 로그를 추가하고 전체 로그를 공유 할 수 있습니까? –

+0

디버그 로그를 추가 한 후 org.apache.kafka.common.errors.WakeupException을 얻었습니다. – nick

관련 문제