2014-06-18 3 views
3

저는 Spring Integration Kafka 확장을 사용하여 Java 애플리케이션의 Kafka에서 메시지를 읽고 처리합니다. Zookeeper에서 오프셋을 완전히 관리 할 수없는 상위 수준의 소비자 API를 사용한다는 것을 알고 있습니다.스프링 통합 Kafka 및 오프셋 관리

내 경우에는 메시지가 처리 된 후 Zookeeper에 오프셋을 적용하기 위해 auto.commit.enable = false입니다. 처리가 실패하면 오프셋은 커밋되지 않으며 우리는 Zookeeper로부터 오프셋을 시작으로 구성된 동일한 시간에 동일한 메시지를 다시 처리해야합니다. 하지만 아파치 카프카 클라이언트가 메모리에서 상쇄되도록하기 때문에 작동하지 않습니다. 이 사육사보다 더 큰 다음 하나를 소비하여 메시지를 읽에

나는 kafka.consumer.ConsumerIterator이 오프셋을 처리하는 발견했고 경우 consumedOffset.

그래서 나는 Zookeeper에있는 오프셋에서 읽기 시작하기 위해 Kafka 클라이언트의 오프셋을 재설정하는 방법이 있는지 궁금합니다.

답변

1

<int-kafka> 설정을 공유하고 문제가있는 부분을 지적 하시겠습니까?

어쩌면이 작업을 수행하기에 충분할 수도 있습니다. MessageLeftOverTracker.clearMessagesLeftOver()?

저는 카프카와 잘 어울리지 않지만 스프링 통합이 무엇을하는지 알고 있습니다.

관련 문제