Manually ack messages in RabbitMQ

lambodar picture lambodar · Apr 15, 2015 · Viewed 8.5k times · Source

Previously I was reading all the messages present in the queue, but now I have to return specific amount of message based of users choice(count).

I try to change the for loop accordingly but its reading all the message because of auto acknowledge. So I tried changing it to manual in config file.

In my program how to ack message manually after reading msg(currently i am using AmqpTemplate to receive and i don't have reference of channel)?

    Properties properties = admin.getQueueProperties("queue_name");
    if(null != properties)
    {
        Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());          
        while(messageCount > 0)
        {
            Message msg = amqpTemplate.receive(queue_name);
            String value = new String(msg.getBody());
            
            valueList.add(value);
            messageCount--;
        }
}

Any help is highly appreciable, Thanks in advance.

Answer

Gary Russell picture Gary Russell · Apr 15, 2015

You cannot manually ack with the receive() method - use a SimpleMessageListenerContainer for an event-driven consumer with MANUAL acks and a ChannelAwareMessageListener. Or, use the template's execute() method which gives you access to the Channel - but then you will be using the lower-level RabbitMQ API, not the Message abstraction.

EDIT:

You need to learn the underlying RabbitMQ Java API to use execute, but something like this will work...

    final int messageCount = 3;
    boolean result = template.execute(new ChannelCallback<Boolean>() {

        @Override
        public Boolean doInRabbit(final Channel channel) throws Exception {
            int n = messageCount;
            channel.basicQos(messageCount); // prefetch
            long deliveryTag = 0;
            while (n > 0) {
                GetResponse result = channel.basicGet("si.test.queue", false);
                if (result != null) {
                    System.out.println(new String(result.getBody()));
                    deliveryTag = result.getEnvelope().getDeliveryTag();
                    n--;
                }
                else {
                    Thread.sleep(1000);
                }
            }
            if (deliveryTag > 0) {
                channel.basicAck(deliveryTag, true);
            }
            return true;
        }
    });