2017-03-24 2 views
3

kafka 주제에서 소비하는 데 KafkaMessageListenerContainer를 사용하고 있습니다. 다른 마이크로 서비스에 종속적 인 각 레코드를 처리하는 응용 프로그램 논리가 있습니다. 이제 각 레코드가 처리 된 후 오프셋을 수동으로 커밋합니다.스프링 카프카 소비자는 런타임에 오프셋을 찾으십니까?

그러나 응용 프로그램 논리가 실패하면 실패한 오프셋을 찾아 성공할 때까지 처리해야합니다. 이를 위해서는 마지막 오프셋을 수동으로 찾아야합니다.

아직 KafkaMessageListenerContainer에서 가능합니까?

+0

응용 프로그램 논리 이후에 오프셋을 적용하고 있습니다. 따라서 응용 프로그램 논리가 실패한 경우 오프셋을 커밋하지 않으면 오프셋이 앞으로 이동하지 않고 동일한 메시지를 다시 처리하게됩니다. 이게 당신의 문제를 해결합니까? – yaswanth

+1

@yaswanth 아니, 그런 것 같지 않아. 테스트를 시작할 때까지 나는 당신과 비슷한 가정을했습니다. ENABLE_AUTO_COMMIT_CONFIG - 거짓 및 AcknowledgingMessageListener를 사용하여 AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE의 컨테이너 속성을 활성화했습니다. 기록 1을 보내고 주제 2에 기록 2를 보냈습니다. 기록 1 - 응용 논리가 실패했습니다. (나는 인정하지 않습니다.)이 시점에서 다음 소비자 투표는 다시 기록 1을 얻지 만 기록 2를 얻습니다. 구성 매개 변수에 의해 뭔가 고정 알려주세요! – sash

+0

당신 말이 맞아요! 나는 지금까지 거짓 가정하에있다. – yaswanth

답변

2

Seeking to a Specific Offset을 참조하십시오.

, 리스너는 다음과 같은 방법이있다 ConsumerSeekAware를 구현해야 추구하기 위해 :

void registerSeekCallback(ConsumerSeekCallback callback);

void onPartitionsAssigned(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

void onIdleContainer(Map<TopicPartition, Long> assignments, ConsumerSeekCallback callback);

컨테이너가 시작될 때 첫 번째가 호출 ; 이 콜백은 초기화 후 임의의 시간에 검색 할 때 사용해야합니다. 콜백에 대한 참조를 저장해야합니다. 여러 컨테이너 (또는 ConcurrentMessageListenerContainer)에서 동일한 수신기를 사용하는 경우 콜백은 ThreadLocal 또는 수신기 스레드에 의해 키잉 된 다른 구조로 저장해야합니다.

+0

그것은 그것을 지적 해 주셔서 감사합니다 :) – sash

관련 문제