1

Apache Kafka를 사용하고 있습니다. 나는 제작자가 Java로 코딩되고 Consumer가 Scala로 코딩 된 war 파일을 만들었습니다. 제작자가 HTML 페이지에서 데이터를 가져 오는 중입니다. 제작자가 게시 한 대부분의 데이터가 소비자에서 발견된다는 것을 알 수 있지만 일부 데이터가 누락되었습니다. 여기 Apache Kafka Java 제작자 스칼라 소비자 누락 된 스트림

지금 생산자에 대한 내 코드

파일 1

package com.cts.rest; 

import java.util.Properties; 

import kafka.producer.ProducerConfig; 

public class Configuration { 

static ProducerConfig setKafkaProducerParameter() { 
    Properties properties = new Properties(); 
    properties.put("zk.connect", "localhost:2181"); 
    properties.put("metadata.broker.list", "localhost:9092"); 
    properties.put("serializer.class", "kafka.serializer.StringEncoder"); 
    properties.put("acks", 0); 
    ProducerConfig producerConfig = new ProducerConfig(properties); 
    return producerConfig; 
    } 

}

파일 2

package com.cts.rest; 

import kafka.javaapi.producer.Producer; 
import kafka.producer.KeyedMessage; 
import kafka.producer.ProducerConfig; 


public class RTTSKProducer { 

static void sendDataToProducer(String line){ 

    ProducerConfig producerConfig = configuration.setKafkaProducerParameter(); 
    Producer<String, String> producer = new Producer<String, String>(producerConfig);  

    String topic = "jsondata";  
    KeyedMessage<String, String> msg = new KeyedMessage<String, String>(topic, line); 
    System.out.print(msg); 
    producer.send(msg); 
    producer.close(); 
      } 
    } 

이다 나는 소비자 날기에 메시지를 확인하고 g 다음 명령.

bin\windows\kafka-console-consumer.bat --zookeeper localhost:2181 --topic jsondata --from-beginning 

생산자 구성이 누락 되었습니까?

답변

1

'acks'설정을 늘려 더 많은 내구성을 확보 할 수 있습니다. 가장 중요한 것은 콜백 함수를 사용하여 'send'메소드를 호출하여 Kafka에 성공적으로 게시되지 않은 메시지를 다음과 같이 처리해야한다는 것입니다.

producer.send(myRecord, 
      new Callback() { 
       public void onCompletion(RecordMetadata metadata, Exception e) { 
        if(e != null) 
         e.printStackTrace(); 
        System.out.println("The offset of the record we just sent is: " + metadata.offset()); 
       } 
      }); 
관련 문제