2017-05-19 1 views
1

ActiveMQ JMS 대기열에서 읽고 일부 처리를 수행 한 결과를 원격 대상에 전달하는 Camel Route가 있습니다. 이 프로세스의 통신 부분이 실패하면 무한히 (대상이 "작동 중"이 될 때까지)을 다시 시도하고 싶습니다. 나는 transacted 경로를 만들고 redeliveryPolicy을 설정하여이 문제를 해결할 수 있습니다. 재 전달은 Camel (camel.processor.RedeliveryPolicy을 통해 라우트의 실패한 부분 다시 시도) 및 ActiveMQConnectionFactory (org.apache.activemq.RedeliveryPolicy을 통해 전체 라우트를 다시 시도 함)에서 설정됩니다.Camel Route로 JMS 재 전달을 처리 할 수 ​​있지만 메시지 삭제를 허용하는 방법은 무엇입니까?

또한 JMS 큐 (JMX를 통해 ActiveMQ와 통신하는 응용 프로그램을 통해 수행)에서 항목을 삭제할 수 있어야하고 처리가 다음 메시지로 이동해야한다는 요구 사항이 있습니다.

문제는 소비자를 cacheLevelName=CACHE_NONE으로 설정하여 메시지 삭제를 허용하거나 연결 처리가 (소비자를 cacheLevelName=CACHE_CONSUMER (으)로 설정하여) 다시 시도 할 수 있다는 것입니다. 경로가 jms:queue:myqueue에서 읽고, 그리고 컴즈 예외, 낙타의 경우,이 설정으로

@Component 
public class TransactedRoute extends SpringRouteBuilder { 
    @Override 
    public void configure() { 
    onException(Exception.class) // we would handle comms exceptions here 
     .redeliveryPolicyRef("redeliveryProfile") 
     .rollback(); 

    from("jms:queue:myqueue?cacheLevelName=CACHE_CONSUMER") 
     .transacted("PROPAGATION_REQUIRED") 
     .bean(Bean1.class) 
     .to(DESTINATION); // this could throw a comms exception 
    } 
} 

: 매우 간단한 경로와

<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager"> 
    <property name="connectionFactory" ref="jmsConnectionFactory"/> 
</bean> 

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 
    <property name="brokerURL" value="${jms.brokerUrl}" /> 
    <property name="redeliveryPolicy" ref="amqRedeliveryPolicy" /> 
    <property name="prefetchPolicy"> 
    <bean class="org.apache.activemq.ActiveMQPrefetchPolicy"> 
     <property name="all" value="0"/> 
    </bean> 
    </property> 
</bean> 

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="jmsConnectionFactory"/> 
    <property name="transactionManager" ref="jmsTransactionManager"/> 
    <property name="transacted" value="true"/> 
    <property name="concurrentConsumers" value="1"/> 
</bean> 

<bean id="jms" class="org.apache.camel.component.jms.JmsComponent" > 
    <property name="configuration" ref="jmsConfig" /> 
</bean> 

<!-- Redelivery Policy for ActiveMQ (the Broker), so it will perform retries of the entire route --> 
<bean id="amqRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> 
    <property name="initialRedeliveryDelay" value="5000" /> 
    <property name="redeliveryDelay" value="5000" /> 
    <property name="maximumRedeliveries" value="-1" /> 
    <property name="queue" value=">" /> 
</bean> 

<!-- Redelivery Policy for Camel Internal, so it will retry from the part of the exchange that's failed in case of comms issues --> 
<bean id="redeliveryProfile" class="org.apache.camel.processor.RedeliveryPolicy"> 
    <property name="maximumRedeliveries" value="2"/> 
    <property name="redeliveryDelay" value="500"/> 
</bean> 

<bean id="PROPAGATION_REQUIRED" class="org.apache.camel.spring.spi.SpringTransactionPolicy"> 
    <constructor-arg> 
    <bean class="org.springframework.transaction.support.TransactionTemplate"> 
     <property name="transactionManager" ref="jmsTransactionManager"/> 
    </bean> 
    </constructor-arg> 
</bean> 

<camel:camelContext id="testContext"> 
    <camel:routeBuilder ref="transactedRoute" /> 
</camel:camelContext> 

: 여기

내 현재 설정입니다 내부적으로 to 부분의 경로를 두 번 다시 시도합니다 ( redeliveryProfile). 그런 다음 5 초 후에 전체 경로를 통해 메시지가 다시 전송됩니다 ( amqRedeliveryPolicy). 이것은 시험을 중단 할 때까지 계속됩니다.

그러나 ActiveMQ에서 메시지를 삭제하면 대기열에 더 이상 있지 않아도 메시지가 경로에서 계속 처리됩니다. 내가 할 소비자를 변경하는 경우

: 무시

from("jms:queue:myqueue?cacheLevelName=CACHE_NONE") 

지금의 ActiveMQ에서 메시지를 삭제할 수 있습니다 및 경로가 처리를 중지 ...하지만 amqRedeliveryPolicy, 메시지는 즉시 (더 5 시도되지 않습니다 두 번째 지연) 6 회 시도 (AMQ의 기본값) 후에 데드 레터 대기열에 넣습니다.

내 구성을 수정하여 두 가지를 모두 달성 할 수있는 방법이 있습니까?

아니면 라인을 따라 어딘가에 요점을 완전히 놓쳤습니까?

+0

나는 좀 더 놀았으며 라우트의'to (...) '에'acknowledgementModeName'을'CLIENT_ACKNOWLEDGE'로 설정하면 메시지 재발신을 인식하고 계속 재 시도를한다는 것을 알았지 만 그 메시지는 카멜 내부 재 배달 후 즉시 재전송됩니다 (5 초 지연 없음). – icabod

답변

0

JMX를 통해 대기열에서 특정 메시지를 삭제하기 위해 사람의 개입이 필요하다면 나쁜 디자인과 약간 비슷합니다.

아마도 X 번 실패한 시도 후에도 AMQ가 메시지를 DLQ로 옮길 수 있습니다. 재 배달도 영원히 좋지 않습니다. 그러나 6 회 시도가 약간이라면 기본값을 더 높은 값으로 올릴 수 있습니다.

그런 다음 DLQ 대기열에서 메시지를 검사하여 메시지가 실패한 이유를 이해하고, JMX 또는 다른 도구를 통해 DLQ 대기열을 안전하게 삭제하거나 삭제할 수 있습니다.

AMQ는 하나의 공통 DLQ 대기열 대신 DLQ 접두사 등을 사용하도록 구성 할 수있는 대기열 당 DLQ를 허용합니다.

+0

예, 이상한 디자인입니다. 우리가 읽은 대기열은 실제로 사용자가 응용 프로그램의 다른 부분을 통해 볼 수 있으며 사용자가 메시지를 삭제할 수있는 요구 사항이 있습니다. 'to (...) '부분은 실제로 다운 될 수있는 원격 시스템입니다.이 경우 목적지가 올라올 때까지 다시 시도하거나 사용자가 메시지를 제거합니다. 기본적으로 우리는 comms 오류의 경우 트랜잭션을 롤백 할 것입니다. 나는 그 질문을 명확하게하기 위해이 질문을 변경합니다. – icabod

관련 문제