0

confluent-3.0.1 플랫폼을 사용하고 Kafka-Elasticsearch 커넥터를 구축하고 있습니다. 이를 위해 Kafka에서 데이터를 가져 오기 위해 SinkConnector 및 SinkTask (Kafka-connect APIs)를 확장했습니다.Confluent 플랫폼의 Kafka-Connect API에서 max.poll.records를 설정하는 방법

이 코드의 일부로 SinkConnector의 taskConfigs 메서드를 확장하여 "max.poll.records"를 반환하여 한 번에 100 개의 레코드 만 가져옵니다. 하지만 작동하지 않고 동시에 모든 레코드를 가져 오는 중이고 규정 된 시간 내에 오프셋을 수행하지 못하고 있습니다. 어느 날 "max.poll.records"

public List<Map<String, String>> taskConfigs(int maxTasks) { 
    ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>(); 
    for (int i = 0; i < maxTasks; i++) { 
     Map<String, String> config = new HashMap<String, String>(); 
     config.put(ConfigurationConstants.CLUSTER_NAME, clusterName); 
     config.put(ConfigurationConstants.HOSTS, hosts); 
     config.put(ConfigurationConstants.BULK_SIZE, bulkSize); 
     config.put(ConfigurationConstants.IDS, elasticSearchIds); 
     config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics); 
     config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish); 
     config.put(ConfigurationConstants.TYPES, elasticSearchTypes); 
     config.put("max.poll.records", "100"); 

     configs.add(config); 
    } 
    return configs; 
    } 
+0

BTW, 지류 3.1 (발표 오늘) 요구 사항을 충족 할 경우에 Elasticsearch 싱크 커넥터를 포함하고있다. http://docs.confluent.io/3.1.0/connect/connect-elasticsearch/docs/index.html – shikhar

답변

3

당신은 커넥터 구성에 max.poll.records 같은 대부분의 카프카 소비자 CONFIGS을 재정의 할 수 없습니다를 구성하는 데 도움이 될 수 있습니다하시기 바랍니다. 그래도 Connect Worker 구성에서는 접두어 consumer.으로 시작할 수 있습니다.

+0

worker.properties를 만들고 속성 파일에 위 속성을 제공하고 아래 명령을 실행했습니다. sh ./bin/connect-standalone ./etc/schema-registry/connect-avrostandalone.properties ./etc/kafka-connect-elasticsearch/worker.properties ./etc/kafka-connect-elasticsearch/connector.properties> connectorlogs.log 하지만 예외가 발생했습니다. org.apache.kafka.common.config.ConfigException : 필수 값이없는 필수 구성 "connector.class"가 누락되었습니다. – Renukaradhya

+0

내 worker.properties에 "group.id"= operative1이 포함되어 있습니다. "operative1.max.poll.records"= 1000이고 내 connector.properties에 "connector.class"가 포함되어 있지만 여전히이 오류가 발생합니다. – Renukaradhya

+0

작업자 설정에서'consumer.max.poll.records = 1000'이 필요합니다. – shikhar

0

해결되었습니다. 나는 connect-avro-standalone.properties

group.id=mygroup 
consumer.max.poll.records=1000 

에 구성 아래에 추가 내 커넥터를 실행하기위한 명령 아래에 달렸다.

sh ./bin/connect-standalone ./etc/schema-registry/connect-avro-standalone.properties ./etc/kafka-connect-elasticsearch/connect-elasticsearch-sink.properties 
관련 문제