Acknowledgement.acknowledge() throwing exception in spring-kafka @KafkaListener

Aravind Kv picture Aravind Kv · Jun 27, 2017 · Viewed 9.7k times · Source

When I set enable.auto.commit to false and try to manually commit offset using annotation based spring-kafka @KafkaListener, I get a org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message

I have a very simple code as follows:

@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
    System.out.println("Received Messasge in group 'foo': " + message);

    // TODO: Do something with the message
}

And when I send a message from the producer, I get the following exception:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message.

Endpoint handler details:

Method [public void com.****.*****.*******.KafkaMessageListener.listenFooGroup(java.lang.String,org.springframework.kafka.support.Acknowledgment)]

Bean [com.****.*****.*******.KafkaMessageListener@5856dbe4]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}], failedMessage=GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}]

Please help. TIA.

Answer

Gary Russell picture Gary Russell · Jun 27, 2017

You have to the set the container factory's containerProperties ackMode to MANUAL or MANUAL_IMMEDIATE to get an Acknowledgment object.

With other ack modes, the container is responsible for committing the offset.

factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE)

Or set the ....ackMode property if using Spring Boot