When I set enable.auto.commit to false and try to manually commit offset using annotation based spring-kafka @KafkaListener, I get a org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
I have a very simple code as follows:
@KafkaListener(id = "someid", topics = "${demo.topic}", containerFactory = "someContainerFactory")
public void listenFooGroup(String message, Acknowledgement ack) {
System.out.println("Received Messasge in group 'foo': " + message);
// TODO: Do something with the message
}
And when I send a message from the producer, I get the following exception:
org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message.
Endpoint handler details:
Method [public void com.****.*****.*******.KafkaMessageListener.listenFooGroup(java.lang.String,org.springframework.kafka.support.Acknowledgment)]
Bean [com.****.*****.*******.KafkaMessageListener@5856dbe4]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [java.lang.String] to [org.springframework.kafka.support.Acknowledgment] for GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}], failedMessage=GenericMessage [payload=test, headers={kafka_offset=57, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=demotopic}]
Please help. TIA.
You have to the set the container factory's containerProperties
ackMode to MANUAL
or MANUAL_IMMEDIATE
to get an Acknowledgment
object.
With other ack modes, the container is responsible for committing the offset.
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE)
Or set the ....ackMode
property if using Spring Boot