DefaultJmsListenerContainerFactory and Concurrent Connections not shutting down

jcb picture jcb · Sep 10, 2016 · Viewed 12.8k times · Source

I am using Spring 4.x's DefaultJmsListenerContainerFactory in order to connect to an ActiveMQ Queue, process the messages from that queue using a @JmsListener and then push the message to a topic on the same ActiveMQ broker.

I am using a single Caching Connection Factory for both the consumer/listener and the producer, and I set the cache consumer to be false, so that I can cache the producer, but not the consumer. I also set the concurrency to be 1-3, which I expect will have a minimum of 1 consumer in the queue on application startup, and as the messages ramp up, then the number of consumers would reach 3. However, as the number of messages dwindle, I was expecting that the number of consumers would go back down to 1 as well. However, if I take a look at the threads (defaultmessagelistenercontainer-2/3), they are in a waiting state, and they do not shut down. Is it not the expected behaviour that when the load subsides, that the number of consumers would be expected to shut down as well? Please see my configurations below, and let me know if this behaviour is not out of the box, and if I need to add something to get this working as I laid out above.

ApplicationContext.java

    @Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

    factory.setConnectionFactory(connectionFactory());
    factory.setConcurrency(environment.getProperty("jms.connections.concurrent"));
    factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class));
    return factory;
}

@Bean
public CachingConnectionFactory connectionFactory(){
    RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
    redeliveryPolicy.setInitialRedeliveryDelay(environment.getProperty("jms.redelivery.initial-delay", Long.class));
    redeliveryPolicy.setRedeliveryDelay(environment.getProperty("jms.redelivery.delay", Long.class));
    redeliveryPolicy.setMaximumRedeliveries(environment.getProperty("jms.redelivery.maximum", Integer.class));
    redeliveryPolicy.setUseExponentialBackOff(environment.getProperty("jms.redelivery.use-exponential-back-off", Boolean.class));
    redeliveryPolicy.setBackOffMultiplier(environment.getProperty("jms.redelivery.back-off-multiplier", Double.class));

    ActiveMQConnectionFactory activeMQ = new ActiveMQConnectionFactory(environment.getProperty("jms.queue.username"), environment.getProperty("jms.queue.password"), environment.getProperty("jms.broker.endpoint"));
    activeMQ.setRedeliveryPolicy(redeliveryPolicy);
    activeMQ.setPrefetchPolicy(prefetchPolicy());

    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(activeMQ);
    cachingConnectionFactory.setCacheConsumers(environment.getProperty("jms.connections.cache.consumers", Boolean.class));
    cachingConnectionFactory.setSessionCacheSize(environment.getProperty("jms.cache.size", Integer.class));
    return cachingConnectionFactory;
}

@Bean
public JmsMessagingTemplate jmsMessagingTemplate(){
    ActiveMQTopic activeMQ = new ActiveMQTopic(environment.getProperty("jms.queue.out"));

    JmsMessagingTemplate jmsMessagingTemplate = new JmsMessagingTemplate(connectionFactory());
    jmsMessagingTemplate.setDefaultDestination(activeMQ);

    return jmsMessagingTemplate;
}

application.properties

jms.connections.concurrent=1-3
jms.connections.prefetch=1000
jms.connections.transacted=true
jms.connections.cache.consumers=false
jms.redelivery.initial-delay=1000
jms.redelivery.delay=1000
jms.redelivery.maximum=5
jms.redelivery.use-exponential-back-off=true
jms.redelivery.back-off-multiplier=2
jms.cache.size=3
jms.queue.in=in.queue
jms.queue.out=out.queue
jms.broker.endpoint=failover:(tcp://localhost:61616)

Answer

Hassen Bennour picture Hassen Bennour · Sep 13, 2016

try with setting maxMessagesPerTask > 0

@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory() throws Throwable {
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();

    factory.setConnectionFactory(connectionFactory());
    factory.setMaxMessagesPerTask(1);
    factory.setConcurrency(environment.getProperty("jms.connections.concurrent"));
    factory.setSessionTransacted(environment.getProperty("jms.connections.transacted", Boolean.class));
    return factory;
}

you can refer to the doc http://docs.spring.io/spring-framework/docs/4.3.x/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setMaxMessagesPerTask-int-

jms.connections.prefetch=1000 means that if you have 1000 messages waiting on the Q you will have only 1 thread started to treat these 1000 messages.

for example jms.connections.prefetch=1 means the messages will be dispatched equally to all available threads but with this it is better to set maxMessagesPerTask < 0 because Long-lived tasks avoid frequent thread context switches. http://activemq.apache.org/what-is-the-prefetch-limit-for.html