Signal a rollback from a JMS MessageListener

H Marcelo Morales picture H Marcelo Morales · Aug 27, 2011 · Viewed 21.5k times · Source

I've been working with JMS and ActiveMQ. Everything is working wonders. I am not using spring, nor can I.

The interface javax.jms.MessageListener has only one method, onMessage. From within a implementation, there is a chance an exception will be thrown. If in fact an exception gets thrown, then I say the message wasn't properly processed and needs to be re-tried. So, I need ActiveMQ to wait for a little while and then, retry. i.e. I need the thrown exception to rollback the JMS transaction.

How can I accomplish such a behaviour?

Maybe there is some configuration in ActiveMQ I wasn't able to find.

Or... maybe could do away with registering MessageListeners to consumers and consume the messages myself, in a a loop like:

while (true) {
    // ... some administrative stuff like ...
    session = connection.createSesstion(true, SESSION_TRANSACTED)
    try {
        Message m = receiver.receive(queue, 1000L);
        theMessageListener.onMessage(m);
        session.commit();
    } catch (Exception e) {
        session.rollback();
        Thread.sleep(someTimeDefinedSomewhereElse);
    }
    // ... some more administrative stuff
}

in a couple of threads, instead of registering the listener.

Or... I could somehow decorate/AOP/byte-manipulate the MessageListeners to do this themselves.

What route would you take and why?

note: I don't have full control over the MessageListeners code.

EDIT

Answer

whaley picture whaley · Aug 27, 2011

If you want to use SESSION_TRANSACTED as your acknowledgement mode, then you need to setup a RedeliveryPolicy on your Connection/ConnectionFactory. This page on ActiveMQ's website also contains some good info for what you might need to do.

Since you aren't using Spring, you can setup a RedeliveryPolicy with something similar to the following code (taken from one of the above links):

RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

Edit Taking your code snippet added to the answer, the following shows how this works with transactions. Try this code with the Session.rollback() method commented out and you'll see that using SESION_TRANSACTED and Session.commit/rollback works as expected:

@Test
public void test() throws Exception {
    final AtomicInteger atomicInteger = new AtomicInteger(0);

    BrokerService brokerService = new BrokerService();

    String bindAddress = "vm://localhost";
    brokerService.addConnector(bindAddress);
    brokerService.setPersistenceAdapter(new MemoryPersistenceAdapter());
    brokerService.setUseJmx(false);
    brokerService.start();

    ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(bindAddress);
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(500);
    redeliveryPolicy.setBackOffMultiplier(2);
    redeliveryPolicy.setUseExponentialBackOff(true);
    redeliveryPolicy.setMaximumRedeliveries(2);

    activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
    activeMQConnectionFactory.setUseRetroactiveConsumer(true);
    activeMQConnectionFactory.setClientIDPrefix("ID");

    PooledConnectionFactory pooledConnectionFactory = new PooledConnectionFactory(activeMQConnectionFactory);

    pooledConnectionFactory.start();

    Connection connection = pooledConnectionFactory.createConnection();
    final Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
    Queue helloQueue = session.createQueue("Hello");
    MessageConsumer consumer = session.createConsumer(helloQueue);
    consumer.setMessageListener(new MessageListener() {

        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                switch (atomicInteger.getAndIncrement()) {
                    case 0:
                        System.out.println("OK, first message received " + textMessage.getText());
                        session.commit();
                        break;
                    case 1:
                        System.out.println("NOPE, second must be retried " + textMessage.getText());
                        session.rollback();
                        throw new RuntimeException("I failed, aaaaah");
                    case 2:
                        System.out.println("OK, second message received " + textMessage.getText());
                        session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    });
    connection.start();

    {
        // A client sends two messages...
        Connection connection1 = pooledConnectionFactory.createConnection();
        Session session1 = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        connection1.start();

        MessageProducer producer = session1.createProducer(helloQueue);
        producer.send(session1.createTextMessage("Hello World 1"));
        producer.send(session1.createTextMessage("Hello World 2"));

        producer.close();
        session1.close();
        connection1.stop();
        connection1.close();
    }
    JOptionPane.showInputDialog("I will wait, you watch the log...");

    consumer.close();
    session.close();
    connection.stop();
    connection.close();
    pooledConnectionFactory.stop();

    assertEquals(3, atomicInteger.get());
}

}