2015-02-06 3 views
2

주어진 시간에 너무 많은 프로세스를 처리 할 수없는 큰 메시지 (RDBMS 테이블의 토지)를 수신하는 플로우가 있습니다. 이와 같이 <int:poller max-messages-per-poll="" />을 사용하여 처리를 조절하고 과 capacity<int:queue capacity="">과 같이 설정합니다. 여러 스레드/트랜잭션이이 흐름에 참여하고 유스 케이스의 경우 이것이 받아 들일 수 있음을 이해합니다.스프링 통합에서 DynamicPoller를 구현하는 가장 좋은 방법

DB를 폴링하는 쿼리는 실행하는 데 다소 시간이 걸리므로 필요 이상으로 자주 실행하지 않으려합니다. 또한이 흐름이받는 메시지는 "버스트 (bursts)"로 들어오는 경향이 있습니다. 즉, 1000 개의 메시지가 생겨 1 시간 동안 수신되지 않을 수 있습니다.

내가 자주하고 싶은 메시지가 가득차는 경우를 제외하고는 드물게 폴링하는 dynamic-poller을 사용하고 싶습니다. 모든 메시지가 처리 될 때까지 예를 들어 <int:poller max-messages-per-poll="100" />이 있고 폴러가 100 개의 메시지를 읽은 것만 알면 처리해야하는 RDBMS에 더 많은 메시지가 있고 처리가 완료된 직후에 다시 폴링해야 할 가능성이 있습니다.

나는 봄 자연에 동적으로 만들 수있는 trigger을 수정하는 방법을 제공하지 않으며, 이미 봄 통합 참조 " 7.1.5 Change Polling Rate at Runtime" 에서와 dynamic-poller 샘플 프로젝트 살펴 보았다 알고 Dynamic Poller 시작이야
하지만, 정말
내가 이것에 맞지 않을 수 있습니다.이 현재의 부하에 따라 주파수의 변경합니다 poller 필요하지만, 나는 아마도 게리는 " Implementing High-Availability Architectures with Spring Integration"그의 이야기에 구현하는 것이 재미있을 것 같은 것을 언급 생각했다.
어떤 경우에도 poller 빈도를 변경하는 수업을 작성하는 것은 큰 문제가되지 않습니다. 좀 더 어려운 것은 출력 채널에 아무 것도 게시되지 않기 때문에 결과가 생성되지 않은 투표가 발생한시기를 아는 방법입니다.

일부 옵션은 내가 생각했습니다

  1. <int:service-activator> 호출 poller의 채널에 <int:wire-tap channel="" />를 연결합니다. 서비스 활성기는 메시지 수를 검사하여 DynamicPeriodicTrigger에서 pollerperiod을 조정합니다.
    메시지가 수신되지 않으면 폴링주기가 불변으로 남아있게되므로 더 자주 폴링하도록 조정하면 메시지가 수신되지 않으면이 메시지가 표시되지 않습니다.

  2. # 1과 동일하지만, period 다시 initialDelay 옆에 트리거 이후에 ​​발생하거나 일정 시간 후 되돌아갑니다 DynamicPeriodicTrigger에 로직을 추가 할 수 있습니다.

  3. <int:poller> 요소 내의 <int:advice-chain> 요소를 MethodInterceptor 구현으로 사용하십시오.
    Artem이 제안한 것과 비슷합니다. link. 이 방법을 통해 receive 메서드를 사용할 수는 있지만 receive 메서드의 결과에 대한 액세스 권한을 부여하지 않습니다. 그러면 검색된 메시지 수가 줄어 듭니다. 이것은 게리가이 link에 대해 언급 한 것으로 확인 된 것 같습니다.

    요청 처리기 조언 체인은 특별한 경우입니다. 우리는 내부 종단점 방법에 대해서만 조언하고 다운 스트림 처리 (출력 채널에 대해서는)를하지 않도록주의해야했습니다.

    우리가 전체 흐름을 조언하기 때문에 폴러에 조언하는 것이 더 간단합니다. "7.1.4 네임 스페이스 지원"하위 섹션 "AOP Advice chains"에서 설명한대로 MethodInterceptor 인터페이스를 구현하여 조언을 작성하기 만하면됩니다.

    아주 간단한 조언을 SourcePollingChannelAdapterFactoryBeanTests.testAdviceChain()보기 ... 코드

    :
    adviceChain.add(new MethodInterceptor() {
    public Object invoke(MethodInvocation invocation) throws Throwable {
    adviceApplied.set(true);
    return invocation.proceed();
    }
    });
    이 단순히 조언이라고 주장하는 데 사용됩니다 제대로 호출; 실제 조언은 invocation.proceed() 전후에 코드를 추가합니다.

    실제로이 조언은 모든 방법을 조언하지만 단 하나 (Callable.call()) 만 있습니다.

  4. Message<T> receive() 방법을 찾는 포인트 컷으로 AfterReturning 조언을 작성하십시오.

  5. 복제본 JdbcPollingChannelAdapter을 복제하고 새 클래스에 내 후크를 추가하십시오.

  6. 아마도 게리가이 link에 대해 제안하는 것이 유용 할 수 있지만 "요지"링크는 더 이상 유효하지 않습니다.

업데이트 : 내가 구현 결국
옵션은 다음과 같은 것을 안쪽의 AfterReturningAdvice를 사용하는 것이 었습니다.
원래 코드 :

<int-jdbc:inbound-channel-adapter id="jdbcInAdapter" 
    channel="inputChannel" data-source="myDataSource" 
    query="SELECT column1, column2 from tableA" 
    max-rows-per-poll="100"> 
    <int:poller fixed-delay="10000"/> 
</int-jdbc:inbound-channel-adapter> 

새로운 코드 :이 봄 통합 아마도 폴러에 약간의 후크를 제공 할 수 있다고 생각에 좀 더 연구를 한

<bean id="jdbcDynamicTrigger" class="DynamicPeriodicTrigger"> 
    <constructor-arg name="period" value="20000" /> 
</bean> 
<bean id="jdbcPollerMetaData" class="org.springframework.integration.scheduling.PollerMetadata"> 
    <property name="maxMessagesPerPoll" value="1000"/> 
    <property name="trigger" ref="jdbcDynamicTrigger"/> 
</bean> 
<bean id="pollMoreFrequentlyForHighVolumePollingStrategy" class="springintegration.scheduling.PollMoreFrequentlyForHighVolumePollingStrategy"> 
    <property name="newPeriod" value="1"/> 
    <property name="adjustmentThreshold" value="100"/> 
    <property name="pollerMetadata" ref="jdbcPollerMetaData"/> 
</bean> 
<aop:config> 
    <aop:aspect ref="pollMoreFrequentlyForHighVolumePollingStrategy" > 
     <aop:after-returning pointcut="bean(jdbcInAdapterBean) and execution(* *.receive(..))" method="afterPoll" returning="returnValue"/> 
    </aop:aspect> 
</aop:config> 
<bean id="jdbcInAdapterBean" class="org.springframework.integration.jdbc.JdbcPollingChannelAdapter"> 
    <constructor-arg ref="myDataSource" /> 
    <constructor-arg value="SELECT column1, column2 from tableA" /> 
    <property name="maxRowsPerPoll" value="100" /> 
</bean> 
<int:inbound-channel-adapter id="jdbcInAdapter" ref="jdbcInAdapterBean" 
    channel="inputChannel" 
    auto-startup="false"> 
    <int:poller ref="jdbcPollerMetaData" /> 
</int:inbound-channel-adapter> 

개발자가 더 할 수 있도록 그들을 사용자 정의하십시오. 추가 정보를 위해
은 JIRA가 구현되지 않고 사람이 내가 여기에 코멘트를 추가하고 내가 GitHub의 또는 요점의 코드를 사용할 수 있도록하겠습니다 구현 코드에 관심이 있다면

https://jira.spring.io/browse/INT-3633를 참조하십시오.

답변

1

JIRA 문제를 개봉 해 주셔서 감사합니다. 스택 오버플로가 확장 된 대화에 적합하지 않기 때문에이 기능에 대해 논의해야합니다.

그러나 위의 내용이 "..."이지만 "요지"링크는 더 이상 유효하지 않습니다. ". 그것은 나를 위해 잘 작동합니다 ... https://gist.github.com/garyrussell/5374267하지만 JIRA에서 논의 해봅시다.

+0

네 말이 맞아요, 요점 링크는 여전히 유효합니다. 이 문제를 열었을 때 사용하고 있던 브라우저에서 내부적으로 차단 된 것 같습니다. –

관련 문제