Kafka producer JSON serialization

Will picture Will · Nov 20, 2017 · Viewed 7.7k times · Source

I'm trying to use Spring Cloud Stream to integrate with Kafka. The message being written is a Java POJO and while it works as expected (the message is being written to the topic and I can read off with a consumer app), there are some unknown characters being added to the start of the message which are causing trouble when trying to integrate Kafka Connect to sink the messages from the topic.

With the default setup this is the message being pushed to Kafka:

     contentType   "text/plain"originalContentType    "application/json;charset=UTF-8"{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471,"version":null}}

If I configure the Kafka producer within the Java app then the message is written to the topic without the leading characters / headers:

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9092");
        configProps.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
        configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);

        return new DefaultKafkaProducerFactory<String, Object>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

Message on Kafka:

{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471}

Since I'm just setting the key/value serializers I would've expected to be able to do this within the application.yml properties file, rather than doing it through the code. However, when the yml is updated to specify the serializers it's not working as I would expect i.e. it's not generating the same message as the producer configured in Java (above):

spring:
  profiles: local
  cloud:
    stream:
      bindings:
        session:
          destination: session
          contentType: application/json
      kafka:
        binder:
          brokers: localhost
          zkNodes: localhost
          defaultZkPort: 2181
          defaultBrokerPort: 9092
        bindings:
          session:
            producer:
              configuration:
                value:
                  serializer: org.springframework.kafka.support.serializer.JsonSerializer
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer

Message on Kafka:

"/wILY29udGVudFR5cGUAAAAMInRleHQvcGxhaW4iE29yaWdpbmFsQ29udGVudFR5cGUAAAAgImFwcGxpY2F0aW9uL2pzb247Y2hhcnNldD1VVEYtOCJ7InBheWxvYWQiOnsidXNlcm5hbWUiOiJqb2huIn0sIm1ldGFkYXRhIjp7ImV2ZW50TmFtZSI6IkxvZ2luIiwic2Vzc2lvbklkIjoiNGI3YTBiZGEtOWQwZS00Nzg5LTg3NTQtMTQyNDUwYjczMThlIiwidXNlcm5hbWUiOiJqb2huIiwiaGFzU2VudCI6bnVsbCwiY3JlYXRlRGF0ZSI6MTUxMTE4NjI2NDk4OSwidmVyc2lvbiI6bnVsbH19"

Should it be possible to configure this solely through the application yml? Are there additional settings that are missing?

Answer

Will picture Will · Nov 21, 2017

Credit to @Gary for the answer above!

For completeness, the configuration which is now working for me is below.

spring:
  profiles: local
  cloud:
    stream:
      bindings:
        session:
          producer:
            useNativeEncoding: true
          destination: session
          contentType: application/json
      kafka:
        binder:
          brokers: localhost
          zkNodes: localhost
          defaultZkPort: 2181
          defaultBrokerPort: 9092
        bindings:
          session:
            producer:
              configuration:
                value:
                  serializer: org.springframework.kafka.support.serializer.JsonSerializer
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer