Kafka suddenly reset the consumer Offset

Gnarik picture Gnarik · Mar 26, 2015 · Viewed 11.2k times · Source

I'm working with Kafka 0.8 & zookeeper 3.3.5. Actually, we have a dozen of topic we are consuming without any issue.

Recently, we started to feed and consume a new topic that has a weird behavior. The consumed offset was suddenly reset. It respects the auto.offset.reset policy we set (actually smallest) but I cannot understand why the topic suddenly reset its offset.

I'm using the high-level consumer.

Here are some ERROR log I found: We have a bunch of this error log:

[2015-03-26 05:21:17,789] INFO Fetching metadata from broker id:1,host:172.16.23.1,port:9092 with correlation id 47 for 1 topic(s) Set(MyTopic) (kafka.cl
ient.ClientUtils$)
[2015-03-26 05:21:17,789] ERROR Producer connection to 172.16.23.1:9092 unsuccessful (kafka.producer.SyncProducer)
java.nio.channels.ClosedByInterruptException
        at java.nio.channels.spi.AbstractInterruptibleChannel.end(AbstractInterruptibleChannel.java:202)
        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:681)
        at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
        at kafka.producer.SyncProducer.connect(SyncProducer.scala:141)
        at kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:156)
        at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
        at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
        at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:88)
        at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
        at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51)

Each time this issue happened I see that WARN log:

[2015-03-26 05:21:30,596] WARN Reconnect due to socket error: null (kafka.consumer.SimpleConsumer)

And then the real problem happens:

[2015-03-26 05:21:47,551] INFO Connected to 172.16.23.5:9092 for producing (kafka.producer.SyncProducer)
[2015-03-26 05:21:47,552] INFO Disconnecting from 172.16.23.5:9092 (kafka.producer.SyncProducer)
[2015-03-26 05:21:47,553] INFO [ConsumerFetcherManager-1427047649942] Added fetcher for partitions ArrayBuffer([[MyTopic,0], initOffset 45268422051 to br
oker id:5,host:172.16.23.5,port:9092] ) (kafka.consumer.ConsumerFetcherManager)
[2015-03-26 05:21:47,553] INFO [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Starting  (kafka.consumer.Cons
umerFetcherThread)
[2015-03-26 05:21:50,388] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 45268422051 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,490] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,591] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)
[2015-03-26 05:21:50,692] ERROR [ConsumerFetcherThread-MyTopic_group-1427047649884-699191d4-0-5], Current offset 1948447612 for partition [MyTopic,0] out of range; reset offset to 1948447612 (kafka.consumer.ConsumerFetcherThread)

Now the questions: Is there someone who has already experienced this behavior ? Is there someone who can tell me when Kafka decides to reset its offset whether the auto.offset.reset is largest or smallest ?

Thank you.

Answer

C4stor picture C4stor · Apr 7, 2016

What's happening is you are depiling your topic too slowly for a while.

Kafka has a retention model which is not based on whether consumer got the data, but on disk usage and/or period. At some point, you get too late, and the next message you need has already been wiped out, and isn't available anymore, due to kafka having cleaned the data. Hence the Current offset 45268422051 for partition [MyTopic,0] out of range; reset offset to 1948447612 messages.

Your consumer then applies your reset policy to bootstrap itself again, in your case smallest.

It's a common issue when you have bursty workflows, and sometimes fall out of the data retention range. It probably disappeared because you improved your depiling speed, or increased your retention policy to be able to survive bursts.