주어진 시간에 너무 많은 프로세스를 처리 할 수없는 큰 메시지 (RDBMS 테이블의 토지)를 수신하는 플로우가 있습니다. 이와 같이 <int:poller max-messages-per-poll="" />
을 사용하여 처리를 조절하고 과 capacity
을 <int:queue capacity="">
과 같이 설정합니다. 여러 스레드/트랜잭션이이 흐름에 참여하고 유스 케이스의 경우 이것이 받아 들일 수 있음을 이해합니다.스프링 통합에서 DynamicPoller를 구현하는 가장 좋은 방법
DB를 폴링하는 쿼리는 실행하는 데 다소 시간이 걸리므로 필요 이상으로 자주 실행하지 않으려합니다. 또한이 흐름이받는 메시지는 "버스트 (bursts)"로 들어오는 경향이 있습니다. 즉, 1000 개의 메시지가 생겨 1 시간 동안 수신되지 않을 수 있습니다.
내가 자주하고 싶은 메시지가 가득차는 경우를 제외하고는 드물게 폴링하는 dynamic-poller
을 사용하고 싶습니다. 모든 메시지가 처리 될 때까지 예를 들어 <int:poller max-messages-per-poll="100" />
이 있고 폴러가 100 개의 메시지를 읽은 것만 알면 처리해야하는 RDBMS에 더 많은 메시지가 있고 처리가 완료된 직후에 다시 폴링해야 할 가능성이 있습니다.
trigger
을 수정하는 방법을 제공하지 않으며, 이미 봄 통합 참조 "
7.1.5 Change Polling Rate at Runtime" 에서와
dynamic-poller
샘플 프로젝트 살펴 보았다 알고
Dynamic Poller 시작이야
하지만, 정말
내가 이것에 맞지 않을 수 있습니다.이 현재의 부하에 따라 주파수의 변경합니다
poller
필요하지만, 나는 아마도 게리는 "
Implementing High-Availability Architectures with Spring Integration"그의 이야기에 구현하는 것이 재미있을 것 같은 것을 언급 생각했다.
어떤 경우에도
poller
빈도를 변경하는 수업을 작성하는 것은 큰 문제가되지 않습니다. 좀 더 어려운 것은 출력 채널에 아무 것도 게시되지 않기 때문에 결과가 생성되지 않은 투표가 발생한시기를 아는 방법입니다.
일부 옵션은 내가 생각했습니다
가
<int:service-activator>
호출poller
의 채널에<int:wire-tap channel="" />
를 연결합니다. 서비스 활성기는 메시지 수를 검사하여DynamicPeriodicTrigger
에서poller
의period
을 조정합니다.
메시지가 수신되지 않으면 폴링주기가 불변으로 남아있게되므로 더 자주 폴링하도록 조정하면 메시지가 수신되지 않으면이 메시지가 표시되지 않습니다.# 1과 동일하지만,
period
다시initialDelay
옆에 트리거 이후에 발생하거나 일정 시간 후 되돌아갑니다DynamicPeriodicTrigger
에 로직을 추가 할 수 있습니다.<int:poller>
요소 내의<int:advice-chain>
요소를MethodInterceptor
구현으로 사용하십시오.
Artem이 제안한 것과 비슷합니다. link. 이 방법을 통해receive
메서드를 사용할 수는 있지만receive
메서드의 결과에 대한 액세스 권한을 부여하지 않습니다. 그러면 검색된 메시지 수가 줄어 듭니다. 이것은 게리가이 link에 대해 언급 한 것으로 확인 된 것 같습니다.요청 처리기 조언 체인은 특별한 경우입니다. 우리는 내부 종단점 방법에 대해서만 조언하고 다운 스트림 처리 (출력 채널에 대해서는)를하지 않도록주의해야했습니다.
우리가 전체 흐름을 조언하기 때문에 폴러에 조언하는 것이 더 간단합니다. "7.1.4 네임 스페이스 지원"하위 섹션 "AOP Advice chains"에서 설명한대로 MethodInterceptor 인터페이스를 구현하여 조언을 작성하기 만하면됩니다.
아주 간단한 조언을
:SourcePollingChannelAdapterFactoryBeanTests.testAdviceChain()
보기 ... 코드
adviceChain.add(new MethodInterceptor() {
public Object invoke(MethodInvocation invocation) throws Throwable {
adviceApplied.set(true);
return invocation.proceed();
}
});
이 단순히 조언이라고 주장하는 데 사용됩니다 제대로 호출; 실제 조언은 invocation.proceed() 전후에 코드를 추가합니다.실제로이 조언은 모든 방법을 조언하지만 단 하나 (
Callable.call()
) 만 있습니다.Message<T> receive()
방법을 찾는 포인트 컷으로AfterReturning
조언을 작성하십시오.복제본
JdbcPollingChannelAdapter
을 복제하고 새 클래스에 내 후크를 추가하십시오.아마도 게리가이 link에 대해 제안하는 것이 유용 할 수 있지만 "요지"링크는 더 이상 유효하지 않습니다.
업데이트 : 내가 구현 결국
옵션은 다음과 같은 것을 안쪽의
AfterReturningAdvice
를 사용하는 것이 었습니다.
원래 코드 :
<int-jdbc:inbound-channel-adapter id="jdbcInAdapter"
channel="inputChannel" data-source="myDataSource"
query="SELECT column1, column2 from tableA"
max-rows-per-poll="100">
<int:poller fixed-delay="10000"/>
</int-jdbc:inbound-channel-adapter>
새로운 코드 :이 봄 통합 아마도 폴러에 약간의 후크를 제공 할 수 있다고 생각에 좀 더 연구를 한
<bean id="jdbcDynamicTrigger" class="DynamicPeriodicTrigger">
<constructor-arg name="period" value="20000" />
</bean>
<bean id="jdbcPollerMetaData" class="org.springframework.integration.scheduling.PollerMetadata">
<property name="maxMessagesPerPoll" value="1000"/>
<property name="trigger" ref="jdbcDynamicTrigger"/>
</bean>
<bean id="pollMoreFrequentlyForHighVolumePollingStrategy" class="springintegration.scheduling.PollMoreFrequentlyForHighVolumePollingStrategy">
<property name="newPeriod" value="1"/>
<property name="adjustmentThreshold" value="100"/>
<property name="pollerMetadata" ref="jdbcPollerMetaData"/>
</bean>
<aop:config>
<aop:aspect ref="pollMoreFrequentlyForHighVolumePollingStrategy" >
<aop:after-returning pointcut="bean(jdbcInAdapterBean) and execution(* *.receive(..))" method="afterPoll" returning="returnValue"/>
</aop:aspect>
</aop:config>
<bean id="jdbcInAdapterBean" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter">
<constructor-arg ref="myDataSource" />
<constructor-arg value="SELECT column1, column2 from tableA" />
<property name="maxRowsPerPoll" value="100" />
</bean>
<int:inbound-channel-adapter id="jdbcInAdapter" ref="jdbcInAdapterBean"
channel="inputChannel"
auto-startup="false">
<int:poller ref="jdbcPollerMetaData" />
</int:inbound-channel-adapter>
개발자가 더 할 수 있도록 그들을 사용자 정의하십시오. 추가 정보를 위해
은 JIRA가 구현되지 않고 사람이 내가 여기에 코멘트를 추가하고 내가 GitHub의 또는 요점의 코드를 사용할 수 있도록하겠습니다 구현 코드에 관심이 있다면
https://jira.spring.io/browse/INT-3633를 참조하십시오.
네 말이 맞아요, 요점 링크는 여전히 유효합니다. 이 문제를 열었을 때 사용하고 있던 브라우저에서 내부적으로 차단 된 것 같습니다. –