2016-10-31 4 views
2

"이 프로그램이 실행 중입니다"라는 문자열을 카프카 항목으로 보내는 카프카 제작자를 만들려고합니다. 나는 왜 그것이 효과가 없는지 확신하지 못한다. 아래 코드는 다음과 같습니다. 나는 Cloudera 분포가 아니다.카프카 프로듀서 보내기 문자열이 작동하지 않습니다.

package kafka_test; 

import java.util.Properties; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.serialization.StringSerializer; 

public class DataMovement { 

    public static void main(String[] args) { 

     String kafkaTopic = args[0]; 

     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server:9092"); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 

     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); 
     ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(kafkaTopic, null, "This program is running."); 

     producer.send(producerRecord); 
     producer.close(); 

    } 
} 

나는 오류 메시지가 있지만 시간 제한하지 않습니다

또한 client.id 등 카프카, SSL, passowrd에 대한 정보를 많이 출력을

10분의 16/31 10:25:46 정보 utils.AppInfoParser : 카프카 버전 : 0.9.0.1 16/10/31 10:25:46 정보 utils.AppInfoParser : Kafka commitId : commitid 16/10/31 10:26:46 정보 생산자 .KafkaProducer : timeoutMillis = 9223372036854775807 ms 인 Kafka 제작자를 닫습니다.

+0

예외가 있습니까? 또한 제작자를 닫기 전에 Thread.sleep (2000)을 사용하여 잠시 기다릴 수 있습니다. 제작자가 닫히면 kafka는 메시지를 topic으로 보내지 않습니다. – Shankar

+0

또한 메시지 키를 설정하지 않으면 ProducerRecord를 두 개의 인수, 항목 이름 및 메시지와 함께 사용할 수 있습니다. – Shankar

+0

시간 초과 오류입니다. 나는 편집을했다. 좋아요, 열쇠를 바꿔 볼게요. 내가 자려고하자. – Defcon

답변

0

코드가 정상적으로 작동합니다. 문제는 서버였습니다 : 9092는 Kafka 클러스터의 지정된 중개인의 주소 여야합니다 (활성 브로커를 대상으로했기 때문에 다릅니다).

+0

당신은 분명히 할 수 있습니까? 모든 활성 작업자는 부트 스트랩 서버로 작업해야합니다. 필요할 경우 MetadataResponse를 사용하여 클라이언트를 다른 서버와 간단히 리디렉션합니다. –

+0

우리는 두 개의 브로커를 가지고 있으며 각각은 자체 노드를 가지고 있습니다. 어떤 이유로 메시지 중 하나를 지시 할 때 메시지가 채워지지 않습니다 (예 :이 broker_node_2를 예로들 수 있습니다). 우리가 send 데이터를 broker_node_1로 보낼 때 작업이 끝났습니다. 이유를 모르겠다. 클러스터가 설정된 방식 일 수 있습니다. – Defcon

관련 문제