2017-09-06 2 views
0

나는 현재 소비자 메시지를 보내기 위해 카프카와 스프링 카프카를 사용하고있다.봄 - 카프카와 카프카 0.10

하지만 문제가 동일한 주제에 대한 몇 가지 소비자 실행을 가지고 몇 가지 질문이 있습니다

1 - 내 소비자 잠시 후 분리 문제가 다시 연결을하는 경향이

정기적으로 발생 WARN 다음 내 소비자의 경우 :

2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.poc.crawler.kafka.KafkaListener  : Consuming {"some-stuff": "yes"} from topic [job15] 
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService : Start of crawling 
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] f.b.p.c.w.services.impl.CrawlingService : Url has already been treated ==> skipping 
2017-09-06 15:32:35.054 WARN 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Auto-commit of offsets {job15-3=OffsetAndMetadata{offset=11547, metadata=''}, job15-2=OffsetAndMetadata{offset=15550, metadata=''}} failed for group group-3: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. 
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : Revoking previously assigned partitions [job15-3, job15-2] for group group-3 
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] s.k.l.ConcurrentMessageListenerContainer : partitions revoked:[job15-3, job15-2] 
2017-09-06 15:32:35.054 INFO 5203 --- [nListener-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : (Re-)joining group group-3 

이렇게하면 소비자가 몇 초 동안 멈추고 기다리게됩니다.

메시지에 언급 된 바와 같이 소비자 session.timeout.ms30000과 같이 증가 시켰습니다. 나는 여전히 메시지를 받는다. 제공된 로그에서 볼 수 있듯이 레코드 연결이 끊어지면 연결이 끊어집니다. ... 30 대 무의미하기 훨씬 전에. 나는 그들이 동일한 메시지를 치료하는 경향이 있음을보고 내 소비자의 로그를 보면서

2 개의 소비자 응용 프로그램은 정말 자주

같은 메시지를 수신합니다. 나는 카프카가 at-least-once 인 것을 이해했다. 그러나 나는 많은 중복을 만날 것이라고 생각하지 못했다. 바라 건데 나는 redis를 사용하지만 아마도 내가해야 할 일은 약간의 튜닝/속성을 이해하지 못했을 것입니다.

하는 코드

참고 : 나는 auto-commit=trueConcurrentMessageListenerContainer를 사용하지만, 1 개 스레드로 실행하고 있습니다. 소비자가 스레드로부터 안전하지 않은 서비스를 사용하기 때문에 동일한 응용 프로그램의 여러 인스턴스가 시작됩니다. 여기

KafkaContext.java

@Slf4j 
@Configuration 
@EnableConfigurationProperties(value = KafkaConfig.class) 
class KafkaContext { 

    @Bean(destroyMethod = "stop") 
    public ConcurrentMessageListenerContainer kafkaInListener(IKafkaListener listener, KafkaConfig config) { 
     final ContainerProperties containerProperties = 
       new ContainerProperties(config.getIn().getTopic()); 
     containerProperties.setMessageListener(listener); 
     final DefaultKafkaConsumerFactory<Integer, String> defaultKafkaConsumerFactory = 
       new DefaultKafkaConsumerFactory<>(consumerConfigs(config)); 

     final ConcurrentMessageListenerContainer messageListenerContainer = 
       new ConcurrentMessageListenerContainer<>(defaultKafkaConsumerFactory, containerProperties); 

     messageListenerContainer.setConcurrency(config.getConcurrency()); 
     messageListenerContainer.setAutoStartup(false); 
     return messageListenerContainer; 
    } 

    private Map<String, Object> consumerConfigs(KafkaConfig config) { 
     final String kafkaHost = config.getHost() + ":" + config.getPort(); 
     log.info("Crawler_Worker connecting to kafka at {} with consumerGroup {}", kafkaHost, config.getIn().getGroupId()); 
     final Map<String, Object> props = new HashMap<>(); 
     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaHost); 
     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true); 
     props.put(ConsumerConfig.GROUP_ID_CONFIG, config.getIn().getGroupId()); 
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JacksonNextSerializer.class); 
     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); 
     props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000); 
     return props; 
    } 

} 

리스너

@Slf4j 
@Component 
class KafkaListener implements IKafkaListener { 

    private final ICrawlingService crawlingService; 

    @Autowired 
    public KafkaListener(ICrawlingService crawlingService) { 
     this.crawlingService = crawlingService; 
    } 

    @Override 
    public void onMessage(ConsumerRecord<Integer, Next> consumerRecord) { 
     log.info("Consuming {} from topic [{}]", JSONObject.wrap(consumerRecord.value()), consumerRecord.topic()); 

     consumerService.apply(consumerRecord.value()); 
    } 
} 

답변

0

가장 큰 문제는 소비자 그룹이 지속적으로 재조정되고 있다는 점이다. session.timeout.ms 늘리는 것이 맞지만 구성에이 구성이 적용되지 않습니다. 제거하십시오 :

props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 30000); 

및 설정 :

props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10); 
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 30000); 

당신은 브로커와의 통신에 더 나은 성능을 얻을 수 MAX_POLL_RECORDS_CONFIG을 증가시킬 수있다. 그러나 한 스레드에서만 메시지를 처리하면이 값을 낮게 유지하는 것이 안전합니다.

+0

안녕하세요, 가끔은 누군가가 코드를 살펴야합니다 ... 잘못된 매개 변수를 선택했습니다. 고맙습니다. 오늘이 문제를 해결하고 업데이트 할 것입니다. – ogdabou

관련 문제