Spring-boot-starter RabbitMQ global error handling

VelNaga picture VelNaga · Feb 13, 2017 · Viewed 14k times · Source

I am using spring-boot-starter-amqp 1.4.2.Producer and consumer working fine but sometimes the incoming JSON messages have an incorrect syntax. This results in the following (correct) exception:

org.springframework.amqp.rabbit.listener.ListenerExecutionFailedException: Listener threw exception
Caused by:  org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not deserialize instance of java.lang.String out of START_ARRAY token...

In future i may face lot more exceptions. So i want to configure a global error handler so that if there is any exception in any one the consumer i can handle it globally.

Note : In this case message is not at all reached consumer. I want to handle these kind of exceptions globally across the consumer.

Please find the below code :

RabbitConfiguration.java

@Configuration
@EnableRabbit
public class RabbitMqConfiguration {

    @Autowired
    private CachingConnectionFactory cachingConnectionFactory;

    @Bean
    public MessageConverter jsonMessageConverter()
    {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    @Primary
    public RabbitTemplate rabbitTemplate()
    {
        RabbitTemplate template = new RabbitTemplate(cachingConnectionFactory);
        template.setMessageConverter(jsonMessageConverter());
        return template;
    }

}

Consumer

@RabbitListener(
        id = "book_queue",
        bindings = @QueueBinding(
                value = @Queue(value = "book.queue", durable = "true"),
                exchange = @Exchange(value = "book.exchange", durable = "true", delayed = "true"),
                key = "book.queue"
        )
    )
public void handle(Message message) {
//Business Logic
}

Could anyone please assist me to handle the error handler globally.Your help should be appreciable.

Updated question as per Gary comment

I can able to run your example and getting the expected output as you said, I just want to try few more negative cases based on your example, but i couldn't understand few things,

this.template.convertAndSend(queue().getName(), new Foo("bar"));

output

Received: Foo [foo=bar]

The above code is working fine.Now instead of "Foo" i am sending some other bean

this.template.convertAndSend(queue().getName(), new Differ("snack","Hihi","how are you"));

output

Received: Foo [foo=null]

The consumer shouldn't accept this message because it is completely a different bean(Differ.class not Foo.class) so i am expecting it should go to "ConditionalRejectingErrorHandler".Why it is accepting wrong payload and printing as null ? Please correct me if i am wrong.

Edit 1 :

Gary, As you said i have set the header "TypeId" while sending the message but still consumer can able to convert wrong messages and it is not throwing any error...please find the code below, I have used your code samples and just did the following modifications,

1) Added "__TypeId__" while sending the message,

this.template.convertAndSend(queue().getName(), new Differ("snack","hihi","how are you"),m -> {
        m.getMessageProperties().setHeader("__TypeId__","foo");
        return m;
    }); 

2) Added "DefaultClassMapper" in the "Jackson2JsonMessageConverter"

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return new Jackson2JsonMessageConverter();
}    

Answer

Gary Russell picture Gary Russell · Feb 14, 2017

Override Boot's listener container factory bean, as described in Enable Listener Endpoint Annotations.

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setErrorHandler(myErrorHandler());
    ...
    return factory;
}

You can inject a custom implementation of ErrorHandler which will be added to each listener container the factory creates.

void handleError(Throwable t);

The throwable will be a ListenerExecutionFailedException which, starting with version 1.6.7 (boot 1.4.4), has the raw inbound message in its failedMessage property.

The default error handler considers causes such as MessageConversionException to be fatal (they will not be requeued).

If you wish to retain that behavior (normal for such problems), you should throw an AmqpRejectAndDontRequeueException after handling the error.

By the way, you don't need that RabbitTemplate bean; if you have just one MessageConverter bean in the application, boot will auto-wire it into the containers and template.

Since you will be overriding boot's factory, you will have to wire in the converter there.

EDIT

You could use the default ConditionalRejectingErrorHandler, but inject it with a custom implementation of FatalExceptionStrategy. In fact, you could subclass its DefaultExceptionStrategy and override isFatal(Throwable t), then, after handing the error, return super.isFatal(t).

EDIT2

Full example; sends 1 good message and 1 bad one:

package com.example;

import org.slf4j.Logger;

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler;
import org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.util.ErrorHandler;

@SpringBootApplication
public class So42215050Application {

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
        context.getBean(So42215050Application.class).runDemo();
        context.close();
    }

    @Autowired
    private RabbitTemplate template;

    private void runDemo() throws Exception {
        this.template.convertAndSend(queue().getName(), new Foo("bar"));
        this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
            return new Message("some bad json".getBytes(), m.getMessageProperties());
        });
        Thread.sleep(5000);
    }

    @RabbitListener(queues = "So42215050")
    public void handle(Foo in) {
        System.out.println("Received: " + in);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    @Bean
    public Queue queue() {
        return new Queue("So42215050", false, false, true);
    }

    @Bean
    public MessageConverter jsonConverter() {
        return new Jackson2JsonMessageConverter();
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

        private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

        @Override
        public boolean isFatal(Throwable t) {
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                logger.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }

    }

    public static class Foo {

        private String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }
}

Result:

Received: Foo [foo=bar]

2017-02-14 09:42:50.972 ERROR 44868 --- [cTaskExecutor-1] 5050Application$MyFatalExceptionStrategy : Failed to process inbound message from queue So42215050; failed message: (Body:'some bad json' MessageProperties [headers={TypeId=com.example.So42215050Application$Foo}, timestamp=null, messageId=null, userId=null, receivedUserId=null, appId=null, clusterId=null, type=null, correlationId=null, correlationIdString=null, replyTo=null, contentType=application/json, contentEncoding=UTF-8, contentLength=0, deliveryMode=null, receivedDeliveryMode=PERSISTENT, expiration=null, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=So42215050, receivedDelay=null, deliveryTag=2, messageCount=0, consumerTag=amq.ctag-P2QqY0PMD1ppX5NnkUPhFA, consumerQueue=So42215050])

EDIT3

JSON does not convey any type information. By default, the type to convert to will be inferred from the method parameter type. If you wish to reject anything that can't be converted to that type, you need to configure the message converter appropriately.

For example:

@Bean
public MessageConverter jsonConverter() {
    Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
    DefaultClassMapper mapper = new DefaultClassMapper();
    mapper.setDefaultType(Foo.class);
    converter.setClassMapper(mapper);
    return converter;
}

Now, when I change my example to send a Bar instead of a Foo...

public static class Bar {

   ...

}

and

this.template.convertAndSend(queue().getName(), new Bar("baz"));

I get...

Caused by: org.springframework.amqp.support.converter.MessageConversionException: Cannot handle message
... 13 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.So42215050Application$Bar] to [com.example.So42215050Application$Foo] for GenericMessage [payload=Bar [foo=baz], headers={amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedRoutingKey=So42215050, amqp_contentEncoding=UTF-8, amqp_deliveryTag=3, amqp_consumerQueue=So42215050, amqp_redelivered=false, id=6d7e23a3-c2a7-2417-49c9-69e3335aa485, amqp_consumerTag=amq.ctag-6JIGkpmkrTKaG32KVpf8HQ, contentType=application/json, __TypeId__=com.example.So42215050Application$Bar, timestamp=1488489538017}]

But this only works if the sender sets the __TypeId__ header (which the template does if it's configured with the same adapter).

EDIT4

@SpringBootApplication
public class So42215050Application {

    private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

    public static void main(String[] args) throws Exception {
        ConfigurableApplicationContext context = SpringApplication.run(So42215050Application.class, args);
        context.getBean(So42215050Application.class).runDemo();
        context.close();
    }

    @Autowired
    private RabbitTemplate template;

    private void runDemo() throws Exception {
        this.template.convertAndSend(queue().getName(), new Foo("bar")); // good - converter sets up type
        this.template.convertAndSend(queue().getName(), new Foo("bar"), m -> {
            return new Message("some bad json".getBytes(), m.getMessageProperties()); // fail bad json
        });
        Message message = MessageBuilder
                .withBody("{\"foo\":\"bar\"}".getBytes())
                .andProperties(
                        MessagePropertiesBuilder
                            .newInstance()
                            .setContentType("application/json")
                            .build())
                .build();
        this.template.send(queue().getName(), message); // Success - default Foo class when no header
        message.getMessageProperties().setHeader("__TypeId__", "foo");
        this.template.send(queue().getName(), message); // Success - foo is mapped to Foo
        message.getMessageProperties().setHeader("__TypeId__", "bar");
        this.template.send(queue().getName(), message); // fail - mapped to a Map
        Thread.sleep(5000);
    }

    @RabbitListener(queues = "So42215050")
    public void handle(Foo in) {
        logger.info("Received: " + in);
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setMessageConverter(jsonConverter());
        factory.setErrorHandler(errorHandler());
        return factory;
    }

    @Bean
    public ErrorHandler errorHandler() {
        return new ConditionalRejectingErrorHandler(new MyFatalExceptionStrategy());
    }

    @Bean
    public Queue queue() {
        return new Queue("So42215050", false, false, true);
    }

    @Bean
    public MessageConverter jsonConverter() {
        Jackson2JsonMessageConverter converter = new Jackson2JsonMessageConverter();
        DefaultClassMapper mapper = new DefaultClassMapper();
        mapper.setDefaultType(Foo.class);
        Map<String, Class<?>> mappings = new HashMap<>();
        mappings.put("foo", Foo.class);
        mappings.put("bar", Object.class);
        mapper.setIdClassMapping(mappings);
        converter.setClassMapper(mapper);
        return converter;
    }

    public static class MyFatalExceptionStrategy extends ConditionalRejectingErrorHandler.DefaultExceptionStrategy {

        private final Logger logger = org.slf4j.LoggerFactory.getLogger(getClass());

        @Override
        public boolean isFatal(Throwable t) {
            if (t instanceof ListenerExecutionFailedException) {
                ListenerExecutionFailedException lefe = (ListenerExecutionFailedException) t;
                logger.error("Failed to process inbound message from queue "
                        + lefe.getFailedMessage().getMessageProperties().getConsumerQueue()
                        + "; failed message: " + lefe.getFailedMessage(), t);
            }
            return super.isFatal(t);
        }

    }

    public static class Foo {

        private String foo;

        public Foo() {
            super();
        }

        public Foo(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Foo [foo=" + this.foo + "]";
        }

    }

    public static class Bar {

        private String foo;

        public Bar() {
            super();
        }

        public Bar(String foo) {
            this.foo = foo;
        }

        public String getFoo() {
            return this.foo;
        }

        public void setFoo(String foo) {
            this.foo = foo;
        }

        @Override
        public String toString() {
            return "Bar [foo=" + this.foo + "]";
        }

    }

}