Error to serialize message when sending to kafka topic

Tiago Costa picture Tiago Costa · Apr 25, 2017 · Viewed 21.1k times · Source

i need to test a message, which contains headers, so i need to use MessageBuilder, but I can not serialize.

I tried adding the serialization settings on the producer props but it did not work.

Can someone help me?

this error:

org.apache.kafka.common.errors.SerializationException: Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

My test class:

public class TransactionMastercardAdapterTest extends AbstractTest{

@Autowired
private KafkaTemplate<String, Message<String>> template;

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

@BeforeClass
public static void setUp() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

@Test
public void sendTransactionCommandTest(){

    String payload = "{\"o2oTransactionId\" : \"" + UUID.randomUUID().toString().toUpperCase() + "\","
            + "\"cardId\" : \"11\","
            + "\"transactionId\" : \"20110405123456\","
            + "\"amount\" : 200.59,"
            + "\"partnerId\" : \"11\"}";

    Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafka);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    Producer<String, Message<String>> producer = new KafkaProducer<>(props);
    producer.send(new ProducerRecord<String, Message<String>> ("notification_topic", MessageBuilder.withPayload(payload)
            .setHeader("status", "RECEIVED")
            .setHeader("service", "MASTERCARD")
            .build()));

    Map<String, Object> configs = KafkaTestUtils.consumerProps("test1", "false", embeddedKafka);

    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);

    Consumer<byte[], byte[]> consumer = cf.createConsumer();
    consumer.subscribe(Collections.singleton("transaction_topic"));
    ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
    consumer.commitSync();

    assertThat(records.count()).isEqualTo(1);
}

}

Answer

Artem Bilan picture Artem Bilan · Apr 25, 2017

I'd say the error is obvious:

Can't convert value of class org.springframework.messaging.support.GenericMessage to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer

Where your value is GenericMessage, but StringSerializer can work only with strings.

What you need is called JavaSerializer which does not exist, but not so difficult to write:

public class JavaSerializer implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        try {
            ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
            ObjectOutputStream objectStream = new ObjectOutputStream(byteStream);
            objectStream.writeObject(data);
            objectStream.flush();
            objectStream.close();
            return byteStream.toByteArray();
        }
        catch (IOException e) {
            throw new IllegalStateException("Can't serialize object: " + data, e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public void close() {

    }

}

And configure it for that value.serializer property.