I am trying to create a Kafka Streams Application which processes Avro records, but I am getting the following error:
Exception in thread "streams-application-c8031218-8de9-4d55-a5d0-81c30051a829-StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Deserialization exception handler is set to fail upon a deserialization error. If you would rather have the streaming pipeline continue after a deserialization error, please set the default.deserialization.exception.handler appropriately.
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:74)
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:91)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:567)
at org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:900)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:801)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:749)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:719)
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
I am not sure what is causing this error. I am just trying to get Avro records into the application first where they then will be processed and then output to another topic but it doesn't not seem to be working. I have included the code from the application below. Can anyone see why it is not working?
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Serde<String> stringSerde = Serdes.String();
Serde<trackingReport> specificAvroTrackingReportSerde = new SpecificAvroSerde<trackingReport>();
specificAvroTrackingReportSerde.configure(Collections.singletonMap(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081"), false);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, trackingReport> inputreports = builder.stream("intesttopic", Consumed.with(stringSerde, specificAvroTrackingReportSerde));
KStream<String, trackingReport> outputreports = inputreports;
String outputTopic = "outtesttopic";
outputreports.to(outputTopic, Produced.with(stringSerde, specificAvroTrackingReportSerde));
Topology topology = builder.build();
KafkaStreams streams = new KafkaStreams(topology, props);
streams.start();
Unknown magic byte!
Means your data does not adhere to the wire format that's expected for the Schema Registry.
Or, in other words, the data you're trying to read, is not Avro, as expected by the Confluent Avro deserializer.
You can expect the same error by running kafka-avro-console-consumer
, by the way, so you may want to debug using that too
If you are sure your data is indeed Avro, and the schema is actually sent as part of the message (would need to see your producer code), then you shouldn't use the Confluent Avro deserializers that are expecting a specific byte format in the message. Instead, you could use ByteArrayDesrializer
and read the Avro record yourself, then pass it to the Apache Avro BinaryDecoder class
. As a bonus, you can extract that logic into your own Deserialzer class
Also, if the input topic is Avro, I don't think you should be using this property for reading strings
DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());