ActiveMQ JMS 대기열에서 읽고 일부 처리를 수행 한 결과를 원격 대상에 전달하는 Camel Route가 있습니다. 이 프로세스의 통신 부분이 실패하면 무한히 (대상이 "작동 중"이 될 때까지)을 다시 시도하고 싶습니다. 나는 transacted
경로를 만들고 redeliveryPolicy
을 설정하여이 문제를 해결할 수 있습니다. 재 전달은 Camel (camel.processor.RedeliveryPolicy
을 통해 라우트의 실패한 부분 다시 시도) 및 ActiveMQConnectionFactory (org.apache.activemq.RedeliveryPolicy
을 통해 전체 라우트를 다시 시도 함)에서 설정됩니다.Camel Route로 JMS 재 전달을 처리 할 수 있지만 메시지 삭제를 허용하는 방법은 무엇입니까?
또한 JMS 큐 (JMX를 통해 ActiveMQ와 통신하는 응용 프로그램을 통해 수행)에서 항목을 삭제할 수 있어야하고 처리가 다음 메시지로 이동해야한다는 요구 사항이 있습니다.
문제는 소비자를 cacheLevelName=CACHE_NONE
으로 설정하여 메시지 삭제를 허용하거나 연결 처리가 (소비자를 cacheLevelName=CACHE_CONSUMER
(으)로 설정하여) 다시 시도 할 수 있다는 것입니다. 경로가 jms:queue:myqueue
에서 읽고, 그리고 컴즈 예외, 낙타의 경우,이 설정으로
@Component
public class TransactedRoute extends SpringRouteBuilder {
@Override
public void configure() {
onException(Exception.class) // we would handle comms exceptions here
.redeliveryPolicyRef("redeliveryProfile")
.rollback();
from("jms:queue:myqueue?cacheLevelName=CACHE_CONSUMER")
.transacted("PROPAGATION_REQUIRED")
.bean(Bean1.class)
.to(DESTINATION); // this could throw a comms exception
}
}
: 매우 간단한 경로와
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${jms.brokerUrl}" />
<property name="redeliveryPolicy" ref="amqRedeliveryPolicy" />
<property name="prefetchPolicy">
<bean class="org.apache.activemq.ActiveMQPrefetchPolicy">
<property name="all" value="0"/>
</bean>
</property>
</bean>
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="transacted" value="true"/>
<property name="concurrentConsumers" value="1"/>
</bean>
<bean id="jms" class="org.apache.camel.component.jms.JmsComponent" >
<property name="configuration" ref="jmsConfig" />
</bean>
<!-- Redelivery Policy for ActiveMQ (the Broker), so it will perform retries of the entire route -->
<bean id="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="initialRedeliveryDelay" value="5000" />
<property name="redeliveryDelay" value="5000" />
<property name="maximumRedeliveries" value="-1" />
<property name="queue" value=">" />
</bean>
<!-- Redelivery Policy for Camel Internal, so it will retry from the part of the exchange that's failed in case of comms issues -->
<bean id="redeliveryProfile" class="org.apache.camel.processor.RedeliveryPolicy">
<property name="maximumRedeliveries" value="2"/>
<property name="redeliveryDelay" value="500"/>
</bean>
<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<constructor-arg>
<bean class="org.springframework.transaction.support.TransactionTemplate">
<property name="transactionManager" ref="jmsTransactionManager"/>
</bean>
</constructor-arg>
</bean>
<camel:camelContext id="testContext">
<camel:routeBuilder ref="transactedRoute" />
</camel:camelContext>
: 여기
내 현재 설정입니다 내부적으로to
부분의 경로를 두 번 다시 시도합니다 (
redeliveryProfile
). 그런 다음 5 초 후에 전체 경로를 통해 메시지가 다시 전송됩니다 (
amqRedeliveryPolicy
). 이것은 시험을 중단 할 때까지 계속됩니다.
그러나 ActiveMQ에서 메시지를 삭제하면 대기열에 더 이상 있지 않아도 메시지가 경로에서 계속 처리됩니다. 내가 할 소비자를 변경하는 경우
: 무시
from("jms:queue:myqueue?cacheLevelName=CACHE_NONE")
지금의 ActiveMQ에서 메시지를 삭제할 수 있습니다 및 경로가 처리를 중지 ...하지만 amqRedeliveryPolicy
, 메시지는 즉시 (더 5 시도되지 않습니다 두 번째 지연) 6 회 시도 (AMQ의 기본값) 후에 데드 레터 대기열에 넣습니다.
내 구성을 수정하여 두 가지를 모두 달성 할 수있는 방법이 있습니까?
아니면 라인을 따라 어딘가에 요점을 완전히 놓쳤습니까?
나는 좀 더 놀았으며 라우트의'to (...) '에'acknowledgementModeName'을'CLIENT_ACKNOWLEDGE'로 설정하면 메시지 재발신을 인식하고 계속 재 시도를한다는 것을 알았지 만 그 메시지는 카멜 내부 재 배달 후 즉시 재전송됩니다 (5 초 지연 없음). – icabod