2011-08-27 5 views
12

JMS 및 ActiveMQ로 작업 해 왔습니다. 모든 것이 놀라운 일입니다. 나는 봄을 사용하지 않으며 또한 할 수 없다.JMS MessageListener에서 롤백 신호 보내기

인터페이스 javax.jms.MessageListener은 하나의 방법, onMessage을 가지고있다. 구현 내에서 예외가 throw 될 수 있습니다. 사실 예외가 발생하면 메시지가 제대로 처리되지 않아 다시 시도해야한다고 말합니다. 그래서 잠시 기다렸다가 다시 시도하기 위해 ActiveMQ가 필요합니다. 즉, JMS 트랜잭션을 롤백하기 위해 예외가 필요합니다.

어떻게 이러한 동작을 수행 할 수 있습니까?

아마도 ActiveMQ에서 일부 구성이 발견되지 않았습니다.

또는 ... 어쩌면 멀리 소비자에게 MessageListener의 등록과 함께 할 수와 같은 루프에서, 메시지 나 자신을 소비 할 수 : 대신에 리스너를 등록하는 스레드의 몇

while (true) { 
    // ... some administrative stuff like ... 
    session = connection.createSesstion(true, SESSION_TRANSACTED) 
    try { 
     Message m = receiver.receive(queue, 1000L); 
     theMessageListener.onMessage(m); 
     session.commit(); 
    } catch (Exception e) { 
     session.rollback(); 
     Thread.sleep(someTimeDefinedSomewhereElse); 
    } 
    // ... some more administrative stuff 
} 

.

또는 ... 어떻게 든/AOP/바이트 조작하여 MessageListener을 직접 처리 할 수 ​​있습니다.

당신은 어떤 경로를 택했을 것이며 그 이유는 무엇입니까?

참고 : MessageListener 코드를 완전히 제어 할 수 없습니다.

편집 개념 증명을위한 시험 :

당신은 Session.CLIENT_ACKNOWLEDGE에 승인 모드를 설정해야
@Test 
@Ignore("Interactive test, just a proof of concept") 
public void transaccionConListener() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 
    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     @Override 
     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         message.acknowledge(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         message.acknowledge(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    brokerService.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 
+0

고맙습니다. 고맙습니다. 답변은 @Ammar입니다. 네가 둘 다 나를 올바른 길로 인도 했으므로 나는 둘 다 상향 투표했다. 그러나 아직 정답을 고르는 것은 아닙니다. 더 많은 테스트가 필요하기 때문입니다. –

답변

10

는, 당신은 설정 RedeliveryPolicy on your Connection/ConnectionFactory해야합니다. This page on ActiveMQ's website에는 수행해야 할 작업에 대한 유용한 정보가 포함되어 있습니다.

당신이 Spring을 사용하지 않기 때문에, 당신은 설정 (위의 링크 중 하나에서 가져온) 다음 코드와 유사한있는 RedeliveryPolicy 할 수 있습니다

RedeliveryPolicy policy = connection.getRedeliveryPolicy(); 
policy.setInitialRedeliveryDelay(500); 
policy.setBackOffMultiplier(2); 
policy.setUseExponentialBackOff(true); 
policy.setMaximumRedeliveries(2); 

편집 촬영을하여 코드 스 니펫이 답변에 추가되면 다음은 트랜잭션과 함께 작동하는 방법을 보여줍니다. 주석 처리 된 Session.rollback() 메소드로이 코드를 시도하면 SESION_TRANSACTED 및 Session을 사용하여 볼 수 있습니다.커밋/롤백 예상대로 작동합니다

@Test 
public void test() throws Exception { 
    final AtomicInteger atomicInteger = new AtomicInteger(0); 

    BrokerService brokerService = new BrokerService(); 

    String bindAddress = "vm://localhost"; 
    brokerService.addConnector(bindAddress); 
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter()); 
    brokerService.setUseJmx(false); 
    brokerService.start(); 

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress); 
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy(); 
    redeliveryPolicy.setInitialRedeliveryDelay(500); 
    redeliveryPolicy.setBackOffMultiplier(2); 
    redeliveryPolicy.setUseExponentialBackOff(true); 
    redeliveryPolicy.setMaximumRedeliveries(2); 

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy); 
    activeMQConnectionFactory.setUseRetroactiveConsumer(true); 
    activeMQConnectionFactory.setClientIDPrefix("ID"); 

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory); 

    pooledConnectionFactory.start(); 

    Connection connection = pooledConnectionFactory.createConnection(); 
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED); 
    Queue helloQueue = session.createQueue("Hello"); 
    MessageConsumer consumer = session.createConsumer(helloQueue); 
    consumer.setMessageListener(new MessageListener() { 

     public void onMessage(Message message) { 
      TextMessage textMessage = (TextMessage) message; 
      try { 
       switch (atomicInteger.getAndIncrement()) { 
        case 0: 
         System.out.println("OK, first message received " + textMessage.getText()); 
         session.commit(); 
         break; 
        case 1: 
         System.out.println("NOPE, second must be retried " + textMessage.getText()); 
         session.rollback(); 
         throw new RuntimeException("I failed, aaaaah"); 
        case 2: 
         System.out.println("OK, second message received " + textMessage.getText()); 
         session.commit(); 
       } 
      } catch (JMSException e) { 
       e.printStackTrace(System.out); 
      } 
     } 
    }); 
    connection.start(); 

    { 
     // A client sends two messages... 
     Connection connection1 = pooledConnectionFactory.createConnection(); 
     Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 
     connection1.start(); 

     MessageProducer producer = session1.createProducer(helloQueue); 
     producer.send(session1.createTextMessage("Hello World 1")); 
     producer.send(session1.createTextMessage("Hello World 2")); 

     producer.close(); 
     session1.close(); 
     connection1.stop(); 
     connection1.close(); 
    } 
    JOptionPane.showInputDialog("I will wait, you watch the log..."); 

    consumer.close(); 
    session.close(); 
    connection.stop(); 
    connection.close(); 
    pooledConnectionFactory.stop(); 

    assertEquals(3, atomicInteger.get()); 
} 

}

+0

그건 작동하지 않았다. 그러나 올바른 방향으로 나를 지적했다. DUPS_OK_ACKNOWLEDGE는 최소한의 노력을 기울여야하는 것 같습니다. –

+0

세션에서 올바르게 작업하지 않기 때문에 코드 전체를 붙여 넣어야합니다. DUPS_OK_ACKNOWLEDGE는 클라이언트 확인이 지연되고 브로커가 클라이언트가 최종적으로 ack를 수행 할 때까지 메시지를 계속 재전송하기 때문에 작동하는 것처럼 보입니다. – whaley

+0

개념 증명을 붙여 넣었습니다. 나는 DUPS_OK_ACKNOWLEDGE만으로 작동하도록 만들 수 있으며 message.acknowledgement는 차이를 만드는 것처럼 보이지 않습니다. –

2

, 클라이언트는 메시지의 acknowledge 메소드를 호출 해 메시지를 인정한다.

QueueSession session = connection.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);

는 그 다음 메시지를 처리 ​​한 후에 그 메시지를 제거하기 위해 Message.acknowledge() 메소드를 호출 할 필요가있다. 당신이 당신의 확인 모드로 SESSION_TRANSACTED를 사용하려면

Message message = ...; 
// Processing message 

message.acknowledge(); 
+0

작동하지 않습니다. _message.acknowledge() _가 호출되지 않더라도 _onMessage_는 여전히 한 번 호출됩니다. –

+0

승인 모드를 올바르게 설정 했습니까? 반드시 Session.CLIENT_ACKNOWLEDGE로 설정해야합니다! – Ammar

+0

하지만 (false, Session.DUPS_OK_ACKNOWLEDGE) ... message.acknowledge()가 트릭을 수행하지 않는 것 같습니다. –

0

세션이 거래되는 경우, "acknowledgeMode"당신의 세션이 거래 떠나 Session.rollback이와여 session.commit를 사용 anyways..So 무시 트랜잭션을 커밋하거나 롤백 할 수 있습니다.

+1

(내) 문제는 세션이 MessageListener.onMessage (Message) 내에서 액세스 할 수 없다는 것입니다. –