Spring JMSTemplate receive all messages in one transaction

Krzysztof Cislo picture Krzysztof Cislo · Jul 30, 2013 · Viewed 10.1k times · Source

I am trying to get all messages from the queue in synchronous mode using Spring JMSTemplate.receive(String) method.

The problem is that I get always only one message. Here is the code:

@Transactional
public List<Message> receiveAllFromQueue(String destination) {
  List<Message> messages = new ArrayList<Message>();
  Message message;
  while ((message = queueJmsTemplate.receive(destination)) != null) {
    messages.add(message);
  }
  return messages;
}

If I remove @Transactional annotation I get all messages, but all is done out of transaction so if later during processing these messages there is an exception the messages will be lost.

Here is a definition of my JMSTemplate bean.

<bean id="queueJmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory" />
    <property name="pubSubDomain" value="false" />
    <property name="receiveTimeout" value="1" />
   <property name="sessionTransacted" value="true" />
</bean>

What I want to achieve is to have one transaction and within this transaction I want to get all pending messages.

Answer

Jonathan H picture Jonathan H · Sep 27, 2018

The receive method of the JmsTemplate creates a new MessageConsumer every time. The second time, your transaction is not yet committed, and Spring will have prefetched a number of messages during the first receive. At that moment are no messages left to fetch, resulting in null from your receive call.

JmsTemplate in Spring has an execute method that takes a SessionCallback as parameter. This allows you to run your own code against the underlying session of the JmsTemplate. Creating only one MessageConsumer should fix your problem.

@Transactional
public List<Message> receiveAllFromQueue(String destination) {
    return jmsTemplate.execute(session -> {
        try (final MessageConsumer consumer = session.createConsumer(session.createQueue(destination))) {
            List<Message> messages = new ArrayList<>();
            Message message;
            while ((message = consumer.receiveNoWait()) != null) {
                messages.add(message);
            }
            return messages;
        }
    }, true);
}