Kafka Avro Consumer with Decoder issues

SparkleGoat picture SparkleGoat · Mar 15, 2016 · Viewed 8.6k times · Source

When I attempted to run Kafka Consumer with Avro over the data with my respective schema,it returns an error of "AvroRuntimeException: Malformed data. Length is negative: -40" . I see others have had similar issues converting byte array to json, Avro write and read, and Kafka Avro Binary *coder. I have also referenced this Consumer Group Example, which have all been helpful, however no help with this error thus far.. It works up until this part of code (line 73)

Decoder decoder = DecoderFactory.get().binaryDecoder(byteArrayInputStream, null);

I have tried other decoders and printed out the contents of byteArrayInputStream variable which looks how what I believe you would expect serialized avro data to look (in the message I can see the schema and some data and some malformed data) I have the printed out the Bytes available using .available() method, which returns 594. I am having trouble understanding why this error is happening. Apache Nifi is used to produce the Kafka stream with same schema from hdfs . I would appreciate any help.

Answer

Michael G. Noll picture Michael G. Noll · Mar 16, 2016

Perhaps the problem is a mismatch between how the Avro data is written (encoded) by Nifi vs. how your consumer app is reading (decoding) the data.

In a nutshell, Avro's API provides two different approaches to serialization:

  1. For creating proper Avro files: To encode the data records but also to embed the Avro schema in a kind of preamble (via org.apache.avro.file.{DataFileWriter/DataFileReader}). Embedding the schema into Avro files makes a lot of sense because (a) typically the "payload" of Avro files is orders of magnitudes larger than the embedded Avro schema and (b) you can then copy or move those files around at your heart's content and still be sure you can read them again without having to consult someone or something.
  2. To encode only the data records, i.e. to not embed the schema (via org.apache.avro.io.{BinaryEncoder/BinaryDecoder}; note the difference in the package name: io here vs. file above). This approach is often favored when Avro-encoding messages that are being written to a Kafka topic, for example, because in comparison to variant 1 above you do not incur the overhead of re-embedding the Avro schema into every single message, assuming that your (very reasonable) policy is that, for the same Kafka topic, messages are formatted/encoded with the same Avro schema. This is a significant advantage because, in a stream data context, a data-in-motion data record is typically much smaller (commonly between 100 bytes and few hundred KB) than data-at-rest Avro files as described above (often hundreds or thousands of MB); so the size of the Avro schema is relatively large, and thus you don't want to embed it 2000x when writing 2000 data records to Kafka. The drawback is that you must "somehow" track how Avro schemas map to Kafka topics -- or more precisely, you must somehow track with which Avro schema a message was encoded without going down the path of embedding the schema directly. The good news is that there is tooling available in the Kafka ecosystem (Avro schema registry) for doing this transparently. So in comparison to variant 1, variant 2 gains on efficiency at the expense of convenience.

The effect is that the "wire format" for encoded Avro data will look different depending on whether you use (1) or (2) above.

I am not very familiar with Apache Nifi, but a quick look at the source code (e.g. ConvertAvroToJSON.java) suggests to me that it is using variant 1, i.e. it embeds the Avro schema alongside the Avro records. Your consumer code, however, uses DecoderFactory.get().binaryDecoder() and thus variant 2 (no schema embedded).

Perhaps this explains the error you have been running into?