2013-07-30 4 views
4

Spring JMSTemplate.receive (String) 메소드를 사용하여 큐에서 모든 메시지를 동기 모드로 가져 오려고합니다.Spring JMSTemplate은 하나의 트랜잭션에서 모든 메시지를 수신합니다.

문제는 항상 하나의 메시지 만 받는다는 것입니다. 여기에 코드입니다 :

@Transactional 
public List<Message> receiveAllFromQueue(String destination) { 
    List<Message> messages = new ArrayList<Message>(); 
    Message message; 
    while ((message = queueJmsTemplate.receive(destination)) != null) { 
    messages.add(message); 
    } 
    return messages; 
} 

내가 모든 메시지를 얻을 @Transactional 주석을 제거 할 수 있지만 메시지가 손실됩니다 예외가이 메시지를 처리하는 동안 나중에 그래서 만약 모든 트랜잭션에서 수행 할 경우

.

다음은 JMSTemplate bean의 정의입니다.

<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="connectionFactory" /> 
    <property name="pubSubDomain" value="false" /> 
    <property name="receiveTimeout" value="1" /> 
    <property name="sessionTransacted" value="true" /> 
</bean> 

달성하려는 목표는 하나의 트랜잭션을 갖는 것이고이 트랜잭션 내에서 모든 보류중인 메시지를 얻고 싶습니다.

답변

3

본인에게 답장을 보내 드리겠습니다. JMSTemplate이 지원하지 않는 것 같습니다. 일시적으로 해결할 수있는 유일한 방법은 JMSTemplate을 확장하고 JMSTemplate의 일부를 사용하는 새로운 메소드를 추가하는 것입니다. 불행히도 일부 메소드는 비공개이므로 복사해야합니다.

public class CustomQueueJmsTemplate extends JmsTemplateDelegate { 

    public List<Message> receiveAll(String destinationName) { 
    return receiveAll(destinationName, null); 
    } 

    public List<Message> receiveAll(final String destinationName, final String messageSelector) { 
    return execute(new SessionCallback<List<Message>>() { 
     @Override 
     public List<Message> doInJms(Session session) throws JMSException { 
     Destination destination = resolveDestinationName(session, destinationName); 
     return doReceiveAll(session, destination, messageSelector); 
     } 
    }, true); 
    } 

    private List<Message> doReceiveAll(Session session, Destination destination, String messageSelector) 
     throws JMSException 
    { 
    return doReceiveAll(session, createConsumer(session, destination, messageSelector)); 
    } 

    private List<Message> doReceiveAll(Session session, MessageConsumer consumer) throws JMSException { 
    try { 
     // Use transaction timeout (if available). 
     long timeout = getReceiveTimeout(); 
     JmsResourceHolder resourceHolder = (JmsResourceHolder) TransactionSynchronizationManager 
      .getResource(getConnectionFactory()); 
     if (resourceHolder != null && resourceHolder.hasTimeout()) { 
     timeout = resourceHolder.getTimeToLiveInMillis(); 
     } 

     // START OF MODIFIED CODE 
     List<Message> messages = new ArrayList<>(); 
     Message message; 
     while ((message = doReceive(consumer, timeout)) != null) { 
     messages.add(message); 
     } 
     // END OF MODIFIED CODE 

     if (session.getTransacted()) { 
     // Commit necessary - but avoid commit call within a JTA transaction. 
     if (isSessionLocallyTransacted(session)) { 
      // Transacted session created by this template -> commit. 
      JmsUtils.commitIfNecessary(session); 
     } 
     } else if (isClientAcknowledge(session)) { 
     // Manually acknowledge message, if any. 
     for (Message retrievedMessages : messages) { 
      retrievedMessages.acknowledge(); 
     } 
     } 
     return messages; 
    } 
    finally { 
     JmsUtils.closeMessageConsumer(consumer); 
    } 
    } 

    private Message doReceive(MessageConsumer consumer, long timeout) throws JMSException { 
    if (timeout == RECEIVE_TIMEOUT_NO_WAIT) { 
     return consumer.receiveNoWait(); 
    } else if (timeout > 0) { 
     return consumer.receive(timeout); 
    } else { 
     return consumer.receive(); 
    } 
    } 

} 
관련 문제