2013-01-31 2 views
0

apache kafka의 메시지를 소비하는 스프링 통합에서 사용자 정의 인바운드 채널 어댑터를 구현하려고합니다. 스프링 통합 예제를 기반으로 MessageSource 인터페이스를 구현하고 kafka에서 소비 된 Message를 반환하는 receive() 메서드를 구현하는 클래스를 만들어야한다는 것을 알게되었습니다. 그러나 consumer example in kafka을 기반으로하면 KafkaStream의 메시지 반복기는 BlockingQueue를 통해 지원됩니다. 따라서 대기열에 메시지가 없으면 스레드가 차단됩니다.Kafka 용 Spring Integration InboundChannelAdapter 구현

그래서이 메소드는 소비 할 때까지 차단할 수 있으므로 receive() 메소드를 구현하는 가장 좋은 방법은 무엇입니까?

더 일반적인 의미에서, 소비 할 준비가 될 때까지 차단되는 스트리밍 메시지 소스에 대한 맞춤 인바운드 채널을 어떻게 구현합니까?

답변

4

receive() 메서드는 기본 작업이 인터럽트 된 스레드에 제대로 응답하는 한 차단할 수 있으며 인바운드 채널 어댑터 관점에서 기본 소스의 예상에 따라 차단할 수 있습니다. 고정 지연 트리거를 사용하십시오. 예를 들어, "긴 폴링"은 매우 작은 지연 값이 제공 될 때 이벤트 중심 동작을 시뮬레이션 할 수 있습니다.

JMS 폴링 MessageSource 구현에서도 비슷한 상황이 발생합니다. 거기에서 기본 동작은 JmsTemplate의 receive() 메소드 중 하나에 의해 처리됩니다. JmsTemplate 자체는 타임 아웃 값을 설정할 수 있습니다. 즉, 최대 5 초 동안 차단하도록 선택할 수 있지만 각 차단 수신 호출간에 매우 짧은 지연 트리거가있을 수 있습니다. 또는 수신 제한 시간을 무기한 지정할 수 있습니다. 결정은 궁극적으로 기본 리소스, 메시지 처리량 등의 기대에 달려 있습니다.

또한 카프카 어댑터를 직접 탐색하고 있음을 알려 드리고자합니다. Spring 통합 확장 저장소에서이 작업을 공동으로 수행하고 싶습니까?

안부, 마크

+0

덕분에 @mfisher, 방법 나는 봄 - 통합 - 확장 저장소와 협력에 대해 논의하기 위해 연락 할 수 ...? – Raja

관련 문제