"이 프로그램이 실행 중입니다"라는 문자열을 카프카 항목으로 보내는 카프카 제작자를 만들려고합니다. 나는 왜 그것이 효과가 없는지 확신하지 못한다. 아래 코드는 다음과 같습니다. 나는 Cloudera 분포가 아니다.카프카 프로듀서 보내기 문자열이 작동하지 않습니다.
package kafka_test;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class DataMovement {
public static void main(String[] args) {
String kafkaTopic = args[0];
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "server:9092");
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(kafkaTopic, null, "This program is running.");
producer.send(producerRecord);
producer.close();
}
}
나는 오류 메시지가 있지만 시간 제한하지 않습니다
또한 client.id 등 카프카, SSL, passowrd에 대한 정보를 많이 출력을
10분의 16/31 10:25:46 정보 utils.AppInfoParser : 카프카 버전 : 0.9.0.1 16/10/31 10:25:46 정보 utils.AppInfoParser : Kafka commitId : commitid 16/10/31 10:26:46 정보 생산자 .KafkaProducer : timeoutMillis = 9223372036854775807 ms 인 Kafka 제작자를 닫습니다.
예외가 있습니까? 또한 제작자를 닫기 전에 Thread.sleep (2000)을 사용하여 잠시 기다릴 수 있습니다. 제작자가 닫히면 kafka는 메시지를 topic으로 보내지 않습니다. – Shankar
또한 메시지 키를 설정하지 않으면 ProducerRecord를 두 개의 인수, 항목 이름 및 메시지와 함께 사용할 수 있습니다. – Shankar
시간 초과 오류입니다. 나는 편집을했다. 좋아요, 열쇠를 바꿔 볼게요. 내가 자려고하자. – Defcon