I am using Spring Kafka first time and I am not able to use Acknowledgement.acknowledge() method for manual commit in my consumer code. please let me know if anything missing in my consumer configuration or listener code. or else is there other way to handle acknowledge offset based on condition. Here i'm looking solution like if the offset is not committed/ acknowledge manually, it should pick same message/offset by consumer.
Configuration
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.AbstractMessageListenerContainer.AckMode;
@EnableKafka
@Configuration
public class ConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${kafka.groupId}")
private String groupId;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
Map<String, Object> props = new HashMap<String, Object>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 100);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<String, String>(
props));
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
}
Listener
private static int value = 1;
@KafkaListener(id = "baz", topics = "${message.topic.name}", containerFactory = "containerFactory")
public void listenPEN_RE(@Payload String message,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) int offsets,
Acknowledgment acknowledgment) {
if (value%2==0){
acknowledgment.acknowledge();
}
value++;
}
Set the enable-auto-commit property to false:
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
Set the ack-mode to MANUAL_IMMEDIATE:
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
Then, in your consumer/listener code, you can commit the offset manually, like this:
@KafkaListener(topics = "testKafka")
public void receive(ConsumerRecord<?, ?> consumerRecord,
Acknowledgment acknowledgment) {
System.out.println("Received message: ");
System.out.println(consumerRecord.value().toString());
acknowledgment.acknowledge();
}
Update: I created a small POC for this. Check it out here, might help you.