This question is for Spring Kafka, related to Apache Kafka with High Level Consumer: Skip corrupted messages
Is there a way to configure Spring Kafka consumer to skip a record that cannot be read/processed (is corrupt)?
I am seeing a situation where the consumer gets stuck on the same record if it cannot be deserialized. This is the error the consumer throws.
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Can not construct instance of java.time.LocalDate: no long/Long-argument constructor/factory method to deserialize from Number value
The consumer polls the topic and just keeps printing the same error in a loop till program is killed.
In a @KafkaListener that has the following Consumer factory configurations,
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
You need ErrorHandlingDeserializer
If you can't move to that 2.2
version, consider to implement your own and return null
for those records which can't be deserialized properly.
The source code is here: