I'm trying to use Spring Cloud Stream to integrate with Kafka. The message being written is a Java POJO and while it works as expected (the message is being written to the topic and I can read off with a consumer app), there are some unknown characters being added to the start of the message which are causing trouble when trying to integrate Kafka Connect to sink the messages from the topic.
With the default setup this is the message being pushed to Kafka:
contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471,"version":null}}
If I configure the Kafka producer within the Java app then the message is written to the topic without the leading characters / headers:
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, Object> producerFactory() {
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
"localhost:9092");
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
JsonSerializer.class);
return new DefaultKafkaProducerFactory<String, Object>(configProps);
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
Message on Kafka:
{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471}
Since I'm just setting the key/value serializers I would've expected to be able to do this within the application.yml
properties file, rather than doing it through the code.
However, when the yml is updated to specify the serializers it's not working as I would expect i.e. it's not generating the same message as the producer configured in Java (above):
spring:
profiles: local
cloud:
stream:
bindings:
session:
destination: session
contentType: application/json
kafka:
binder:
brokers: localhost
zkNodes: localhost
defaultZkPort: 2181
defaultBrokerPort: 9092
bindings:
session:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
Message on Kafka:
"/wILY29udGVudFR5cGUAAAAMInRleHQvcGxhaW4iE29yaWdpbmFsQ29udGVudFR5cGUAAAAgImFwcGxpY2F0aW9uL2pzb247Y2hhcnNldD1VVEYtOCJ7InBheWxvYWQiOnsidXNlcm5hbWUiOiJqb2huIn0sIm1ldGFkYXRhIjp7ImV2ZW50TmFtZSI6IkxvZ2luIiwic2Vzc2lvbklkIjoiNGI3YTBiZGEtOWQwZS00Nzg5LTg3NTQtMTQyNDUwYjczMThlIiwidXNlcm5hbWUiOiJqb2huIiwiaGFzU2VudCI6bnVsbCwiY3JlYXRlRGF0ZSI6MTUxMTE4NjI2NDk4OSwidmVyc2lvbiI6bnVsbH19"
Should it be possible to configure this solely through the application yml? Are there additional settings that are missing?
Credit to @Gary for the answer above!
For completeness, the configuration which is now working for me is below.
spring:
profiles: local
cloud:
stream:
bindings:
session:
producer:
useNativeEncoding: true
destination: session
contentType: application/json
kafka:
binder:
brokers: localhost
zkNodes: localhost
defaultZkPort: 2181
defaultBrokerPort: 9092
bindings:
session:
producer:
configuration:
value:
serializer: org.springframework.kafka.support.serializer.JsonSerializer
key:
serializer: org.apache.kafka.common.serialization.StringSerializer