confluent-3.0.1 플랫폼을 사용하고 Kafka-Elasticsearch 커넥터를 구축하고 있습니다. 이를 위해 Kafka에서 데이터를 가져 오기 위해 SinkConnector 및 SinkTask (Kafka-connect APIs)를 확장했습니다.Confluent 플랫폼의 Kafka-Connect API에서 max.poll.records를 설정하는 방법
이 코드의 일부로 SinkConnector의 taskConfigs 메서드를 확장하여 "max.poll.records"를 반환하여 한 번에 100 개의 레코드 만 가져옵니다. 하지만 작동하지 않고 동시에 모든 레코드를 가져 오는 중이고 규정 된 시간 내에 오프셋을 수행하지 못하고 있습니다. 어느 날 "max.poll.records"
public List<Map<String, String>> taskConfigs(int maxTasks) {
ArrayList<Map<String, String>> configs = new ArrayList<Map<String, String>>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> config = new HashMap<String, String>();
config.put(ConfigurationConstants.CLUSTER_NAME, clusterName);
config.put(ConfigurationConstants.HOSTS, hosts);
config.put(ConfigurationConstants.BULK_SIZE, bulkSize);
config.put(ConfigurationConstants.IDS, elasticSearchIds);
config.put(ConfigurationConstants.TOPICS_SATELLITE_DATA, topics);
config.put(ConfigurationConstants.PUBLISH_TOPIC, topicTopublish);
config.put(ConfigurationConstants.TYPES, elasticSearchTypes);
config.put("max.poll.records", "100");
configs.add(config);
}
return configs;
}
BTW, 지류 3.1 (발표 오늘) 요구 사항을 충족 할 경우에 Elasticsearch 싱크 커넥터를 포함하고있다. http://docs.confluent.io/3.1.0/connect/connect-elasticsearch/docs/index.html – shikhar