JMS 및 ActiveMQ로 작업 해 왔습니다. 모든 것이 놀라운 일입니다. 나는 봄을 사용하지 않으며 또한 할 수 없다.JMS MessageListener에서 롤백 신호 보내기
인터페이스 javax.jms.MessageListener
은 하나의 방법, onMessage
을 가지고있다. 구현 내에서 예외가 throw 될 수 있습니다. 사실 예외가 발생하면 메시지가 제대로 처리되지 않아 다시 시도해야한다고 말합니다. 그래서 잠시 기다렸다가 다시 시도하기 위해 ActiveMQ가 필요합니다. 즉, JMS 트랜잭션을 롤백하기 위해 예외가 필요합니다.
어떻게 이러한 동작을 수행 할 수 있습니까?
아마도 ActiveMQ에서 일부 구성이 발견되지 않았습니다.
또는 ... 어쩌면 멀리 소비자에게 MessageListener
의 등록과 함께 할 수와 같은 루프에서, 메시지 나 자신을 소비 할 수 : 대신에 리스너를 등록하는 스레드의 몇
while (true) {
// ... some administrative stuff like ...
session = connection.createSesstion(true, SESSION_TRANSACTED)
try {
Message m = receiver.receive(queue, 1000L);
theMessageListener.onMessage(m);
session.commit();
} catch (Exception e) {
session.rollback();
Thread.sleep(someTimeDefinedSomewhereElse);
}
// ... some more administrative stuff
}
.
또는 ... 어떻게 든/AOP/바이트 조작하여 MessageListener
을 직접 처리 할 수 있습니다.
당신은 어떤 경로를 택했을 것이며 그 이유는 무엇입니까?
참고 : MessageListener
코드를 완전히 제어 할 수 없습니다.
편집 개념 증명을위한 시험 :
당신은 Session.CLIENT_ACKNOWLEDGE에 승인 모드를 설정해야@Test
@Ignore("Interactive test, just a proof of concept")
public void transaccionConListener() throws Exception {
final AtomicInteger atomicInteger = new AtomicInteger(0);
BrokerService brokerService = new BrokerService();
String bindAddress = "vm://localhost";
brokerService.addConnector(bindAddress);
brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
brokerService.setUseJmx(false);
brokerService.start();
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(500);
redeliveryPolicy.setBackOffMultiplier(2);
redeliveryPolicy.setUseExponentialBackOff(true);
redeliveryPolicy.setMaximumRedeliveries(2);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
activeMQConnectionFactory.setUseRetroactiveConsumer(true);
activeMQConnectionFactory.setClientIDPrefix("ID");
PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);
pooledConnectionFactory.start();
Connection connection = pooledConnectionFactory.createConnection();
Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);
Queue helloQueue = session.createQueue("Hello");
MessageConsumer consumer = session.createConsumer(helloQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
try {
switch (atomicInteger.getAndIncrement()) {
case 0:
System.out.println("OK, first message received " + textMessage.getText());
message.acknowledge();
break;
case 1:
System.out.println("NOPE, second must be retried " + textMessage.getText());
throw new RuntimeException("I failed, aaaaah");
case 2:
System.out.println("OK, second message received " + textMessage.getText());
message.acknowledge();
}
} catch (JMSException e) {
e.printStackTrace(System.out);
}
}
});
connection.start();
{
// A client sends two messages...
Connection connection1 = pooledConnectionFactory.createConnection();
Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection1.start();
MessageProducer producer = session1.createProducer(helloQueue);
producer.send(session1.createTextMessage("Hello World 1"));
producer.send(session1.createTextMessage("Hello World 2"));
producer.close();
session1.close();
connection1.stop();
connection1.close();
}
JOptionPane.showInputDialog("I will wait, you watch the log...");
consumer.close();
session.close();
connection.stop();
connection.close();
pooledConnectionFactory.stop();
brokerService.stop();
assertEquals(3, atomicInteger.get());
}
고맙습니다. 고맙습니다. 답변은 @Ammar입니다. 네가 둘 다 나를 올바른 길로 인도 했으므로 나는 둘 다 상향 투표했다. 그러나 아직 정답을 고르는 것은 아닙니다. 더 많은 테스트가 필요하기 때문입니다. –