KafkaAvroSerializer for serializing Avro without schema.registry.url

scissorHands picture scissorHands · Aug 11, 2017 · Viewed 18.2k times · Source

I'm a noob to Kafka and Avro. So i have been trying to get the Producer/Consumer running. So far i have been able to produce and consume simple Bytes and Strings, using the following : Configuration for the Producer :

    Properties props = new Properties();
    props.put("bootstrap.servers", "localhost:9092");
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

    Schema.Parser parser = new Schema.Parser();
    Schema schema = parser.parse(USER_SCHEMA);
    Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(schema);

    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

    for (int i = 0; i < 1000; i++) {
        GenericData.Record avroRecord = new GenericData.Record(schema);
        avroRecord.put("str1", "Str 1-" + i);
        avroRecord.put("str2", "Str 2-" + i);
        avroRecord.put("int1", i);

        byte[] bytes = recordInjection.apply(avroRecord);

        ProducerRecord<String, byte[]> record = new ProducerRecord<>("mytopic", bytes);
        producer.send(record);
        Thread.sleep(250);
    }
    producer.close();
}

Now this is all well and good, the problem comes when i'm trying to serialize a POJO. So , i was able to get the AvroSchema from the POJO using the utility provided with Avro. Hardcoded the schema, and then tried to create a Generic Record to send through the KafkaProducer the producer is now set up as :

    Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.KafkaAvroSerializer");

Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(USER_SCHEMA); // this is the Generated AvroSchema
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

this is where the problem is : the moment i use KafkaAvroSerializer, the producer doesn't come up due to : missing mandatory parameter : schema.registry.url

I read up on why this is required, so that my consumer is able to decipher whatever the producer is sending to me. But isn't the schema already embedded in the AvroMessage? Would be really great if someone can share a working example of using KafkaProducer with the KafkaAvroSerializer without having to specify schema.registry.url

would also really appreciate any insights/resources on the utility of the schema registry.

thanks!

Answer

Treziac picture Treziac · Aug 11, 2017

Note first: KafkaAvroSerializer is not provided in vanilla apache kafka - it is provided by Confluent Platform. (https://www.confluent.io/), as part of its open source components (http://docs.confluent.io/current/platform.html#confluent-schema-registry)

Rapid answer: no, if you use KafkaAvroSerializer, you will need a schema registry. See some samples here: http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html

The basic idea with schema registry is that each topic will refer to an avro schema (ie, you will only be able to send data coherent with each other. But a schema can have multiple version, so you still need to identify the schema for each record)

We don't want to write the schema for everydata like you imply - often, schema is bigger than your data! That would be a waste of time parsing it everytime when reading, and a waste of ressources (network, disk, cpu)

Instead, a schema registry instance will do a binding avro schema <-> int schemaId and the serializer will then write only this id before the data, after getting it from registry (and caching it for later use).

So inside kafka, your record will be [<id> <bytesavro>] (and magic byte for technical reason), which is an overhead of only 5 bytes (to compare to the size of your schema) And when reading, your consumer will find the corresponding schema to the id, and deserializer avro bytes regarding it. You can find way more in confluent doc

If you really have a use where you want to write the schema for every record, you will need an other serializer (I think writing your own, but it will be easy, just reuse https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/AbstractKafkaAvroSerializer.java and remove the schema registry part to replace it with the schema, same for reading). But if you use avro, I would really discourage this - one day a later, you will need to implement something like avro registry to manage versioning