I am just starting to use Kafka with Spring Boot & want to send & consume JSON objects.
I am getting the following error when I attempt to consume an message from the Kafka topic:
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition dev.orders-0 at offset 9903. If needed, please seek past the record to continue consumption.
Caused by: java.lang.IllegalArgumentException: The class 'co.orders.feedme.feed.domain.OrderItem' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:139) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:113) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:218) ~[spring-kafka-2.1.5.RELEASE.jar:2.1.5.RELEASE]
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:923) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.access$2600(Fetcher.java:93) ~[kafka-clients-1.0.1.jar:na]
I have attempted to add my package to the list of trusted packages by defining the following property in application.properties:
spring.kafka.consumer.properties.spring.json.trusted.packages = co.orders.feedme.feed.domain
This doesn't appear to make any differences. What is the correct way to add my package to the list of trusted packages for Spring's Kafka JsonDeserializer?
Ok, I have read the documentation in a bit more detail & have found an answer to my question. I am using Kotlin so the creation of my consumer looks like this with the
@Bean
fun consumerFactory(): ConsumerFactory<String, FeedItem> {
val configProps = HashMap<String, Any>()
configProps[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = bootstrapServers
configProps[ConsumerConfig.GROUP_ID_CONFIG] = "feedme"
configProps[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configProps[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = JsonDeserializer::class.java
configProps[JsonDeserializer.TRUSTED_PACKAGES] = "co.orders.feedme.feed.domain"
return DefaultKafkaConsumerFactory(configProps)
}
Now I just need a way to override the creation of the Jackson ObjectMapper in the JsonDeserializer so that it can work with my Kotlin data classes that don't have a zero-argument constructor :)