How to read data using Kafka Consumer API from beginning?

Nits picture Nits · Feb 17, 2015 · Viewed 86.4k times · Source

Please can anyone tell me how to read messages using the Kafka Consumer API from the beginning every time when I run the consumer.

Answer

Nautilus picture Nautilus · Mar 5, 2016

This works with the 0.9.x consumer. Basically when you create a consumer, you need to assign a consumer group id to this consumer using the property ConsumerConfig.GROUP_ID_CONFIG. Generate the consumer group id randomly every time you start the consumer doing something like this properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); (properties is an instance of java.util.Properties that you will pass to the constructor new KafkaConsumer(properties)).

Generating the client randomly means that the new consumer group doesn't have any offset associated to it in kafka. So what we have to do after this is to set a policy for this scenario. As the documentation for the auto.offset.reset property says:

What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found or the consumer's group
  • anything else: throw exception to the consumer.

So from the options above listed we need to choose the earliest policy so the new consumer group starts from the beginning every time.

Your code in java, will look something like this:

properties.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "your_client_id");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumer = new KafkaConsumer(properties);

The only thing that you need to figure it out now, is when having multiple consumers that belong to the same consumer group but are distributed how to generate a random id and distribute it between those instances so they all belong to the same consumer group.

Hope it helps!