How to use Ack or Nack in Spring AMQP

Chandan picture Chandan · Sep 16, 2016 · Viewed 14.8k times · Source

I am new to Spring AMQP. I am having an application which is a producer sending messages to the other application which is a consumer.

Once the consumer receives the message, we will do validation of the data.

If the data is proper we have to ACK and message should be removed from the Queue. If the data is improper we have to NACK(Negative Acknowledge) the data so that it will be re-queued in RabbitMQ.

I came across

**factory.setDefaultRequeueRejected(false);**( It will not requeue the message at all)

**factory.setDefaultRequeueRejected(true);**( It will requeue the message when exception occurs)

But my case i will acknowledge the message based on validation. Then it should remove the message. If NACK then requeue the message.

I have read in RabbitMQ website

The AMQP specification defines the basic.reject method that allows clients to reject individual, delivered messages, instructing the broker to either discard them or requeue them

How to achieve the above scenario? Please provide me some examples.

I tried a small Program

       logger.info("Job Queue Handler::::::::::" + new Date());
        try {

        }catch(Exception e){

            logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::");

        }

        factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{
            return cause instanceof XMLException;
        }));

Message is not re queuing for different exception factory.setDefaultRequeueRejected(true)

09:46:38,854 ERROR [stderr] (SimpleAsyncTaskExecutor-1) org.activiti.engine.ActivitiObjectNotFoundException: no processes deployed with key 'WF89012'

09:46:39,102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler] (SimpleAsyncTaskExecutor-1) Received from Error Queue: {ERROR=Could not commit JPA transaction; nested exception is javax.persistence.RollbackException: Transaction marked as rollbackOnly}

Answer

Gary Russell picture Gary Russell · Sep 16, 2016

See the documentation.

By default, (with defaultRequeueRejected=true) the container will ack the message (causing it to be removed) if the listener exits normally or reject (and requeue) it if the listener throws an exception.

If the listener (or error handler) throws an AmqpRejectAndDontRequeueException, the default behavior is overridden and the message is discarded (or routed to a DLX/DLQ if so configured) - the container calls basicReject(false) instead of basicReject(true).

So, if your validation fails, throw an AmqpRejectAndDontRequeueException. Or, configure your listener with a custom error handler to convert your exception to an AmqpRejectAndDontRequeueException.

That is described in this answer.

If you really want to take responsibility for acking yourself, set the acknowledge mode to MANUAL and use a ChannelAwareMessageListener or this technique if you are using a @RabbitListener.

But most people just let the container take care of things (once they understand what's going on). Generally, using manual acks is for special use cases, such as deferring acks, or early acking.

EDIT

There was a mistake in the answer I pointed you to (now fixed); you have to look at the cause of the ListenerExecutionFailedException. I just tested this and it works as expected...

@SpringBootApplication
public class So39530787Application {

    private static final String QUEUE = "So39530787";

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args);
        RabbitTemplate template = context.getBean(RabbitTemplate.class);
        template.convertAndSend(QUEUE, "foo");
        template.convertAndSend(QUEUE, "bar");
        template.convertAndSend(QUEUE, "baz");
        So39530787Application bean = context.getBean(So39530787Application.class);
        bean.latch.await(10, TimeUnit.SECONDS);
        System.out.println("Expect 1 foo:"  + bean.fooCount);
        System.out.println("Expect 3 bar:"  + bean.barCount);
        System.out.println("Expect 1 baz:"  + bean.bazCount);
        context.close();
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setErrorHandler(new ConditionalRejectingErrorHandler(
                t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException));
        return factory;
    }

    @Bean
    public Queue queue() {
        return new Queue(QUEUE, false, false, true);
    }
    private int fooCount;

    private int barCount;

    private int bazCount;

    private final CountDownLatch latch = new CountDownLatch(5);

    @RabbitListener(queues = QUEUE)
    public void handle(String in) throws Exception {
        System.out.println(in);
        latch.countDown();
        if ("foo".equals(in) && ++this.fooCount < 3) {
            throw new FooException();
        }
        else if ("bar".equals(in) && ++this.barCount < 3) {
            throw new BarException();
        }
        else if ("baz".equals(in)) {
            this.bazCount++;
        }
    }

    @SuppressWarnings("serial")
    public static class FooException extends Exception { }

    @SuppressWarnings("serial")
    public static class BarException extends Exception { }

}

Result:

Expect 1 foo:1
Expect 3 bar:3
Expect 1 baz:1