제 3 자 라이브러리를 사용하지 않고 카프카 응용 프로그램에서 단위 테스트를 수행해야합니다.단위 테스트를위한 카프카 항목 지우기
지금 당장 내 문제는 테스트 사이에 모든 주제를 정리하고 싶지만 어떻게해야할지 모르겠다는 것입니다.
이것은 내 임시 솔루션입니다. 각 테스트 후에 생성 된 모든 메시지를 커밋하고 모든 테스트 소비자를 동일한 소비자 그룹에 포함시킵니다.
override protected def afterEach():Unit={
val cleanerConsumer= newConsumer(Seq.empty)
val topics=cleanerConsumer.listTopics()
println("pulisco")
cleanerConsumer.subscribe(topics.keySet())
cleanerConsumer.poll(100)
cleanerConsumer.commitSync()
cleanerConsumer.close()
}
이것은 작동하지 않으며 이유를 알지 못합니다.
예를 들어 테스트 중에 새로운 소비자를 만들 때 messages
에는 이전 테스트에서 생성 된 메시지가 포함됩니다.
val consumerProbe = newConsumer(SMSGatewayTopic)
val messages = consumerProbe.poll(1000)
어떻게 해결할 수 있습니까?
카프카가 지속성 메시지 저장소이며 소비자가가 소비를 시작 오프셋을 결정할 수 있습니다. 마지막으로 오프셋 한 것을 기억하고 그 후에 소비하기 시작하면됩니다. –
예를 들어 다음과 같이 내장 브로커를 사용해보십시오. https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java –
왜이 질문은 Java로 태그가 붙었습니까? 결과적으로 이는 매우 자바에 특정한 검색에서 나타난다. 별로 도움이되지 않습니다. – user1283068