2016-07-11 2 views
2

자바와 함께 KafkaProducer를 사용하고 있습니다. 다음은 코드의 예입니다.카프카에서 연결 오류를 감지하는 방법

package com.mypackage.kafka.producer; 

import org.apache.kafka.clients.producer.KafkaProducer; 
import org.apache.kafka.clients.producer.ProducerRecord; 

import java.util.Properties; 

public class ProducerToTest 
{ 
    public static void main(String[] args) 
    { 
     Properties props = new Properties(); 
     props.put("bootstrap.servers", "localhost:9092"); 
     props.put("retries", 0); 
     props.put("batch.size", 16384); 
     props.put("linger.ms", 1); 
     props.put("buffer.memory", 33554432); 
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 

     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); 
     for (int i = 0; i < 100; i++) 
     { 
      System.out.println(i); 
      producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))); 
     } 

     producer.close(); 
    } 
} 

문제가 있습니다. 내 카프카가 쓰러지고 내 코드를 실행하면 카프카가 다시 올 때까지 기다리지 않고 첫 번째 전송에서 멈 춥니 다.

연결 오류를 감지하거나 사용하려면 을 모든 실행을 중지하지 않는 방식으로 보냅니 까?

답변

0

생산자는 request.timeout.ms까지 다시 연결하려고합니다. 중개인이 무한정 기다릴 필요는 없습니다. 아래의 request.timeout.ms에 대한 세부 정보를 참조하십시오.

구성은 클라이언트가 요청 응답을 기다리는 최대 시간을 제어합니다. 제한 시간이 경과하기 전에 응답을받지 못하면 클라이언트는 필요한 경우 요청을 다시 보내거나 재시도가 모두 소모되면 요청을 실패합니다.

+0

'props.put ("request.timeout.ms", 10);을 추가하려고 시도했지만 _send_가 차단 된 상태입니다. – Logocomune

+0

@Logocomune 어떤 버전을 사용하고 계십니까? –

+0

kafka_2.10 버전 0.10.0.0을 사용합니다. – Logocomune

관련 문제