I am using the below code to read messages from a topic. I am facing two issues. Whenever i start consumer, it is reading all the messages in the queue? How do read only the unread messages?
from kafka import KafkaConsumer
consumer = KafkaConsumer('my-topic',
group_id='my-group',
bootstrap_servers=['localhost:9092'])
for message in consumer:
consumer.commit()
# message value and key are raw bytes -- decode if necessary!
# e.g., for unicode: `message.value.decode('utf-8')`
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
As @Kenji said you have to commit the offsets with consumer.commit()
. If you don't want to commit manually you can enable autocommit by passing enable_auto_commit=True
to your KafkaConsumer
. You may also want to tune auto_commit_interval_ms
which is the interval in milliseconds between each automatic commit. See here: http://kafka-python.readthedocs.org/en/master/apidoc/KafkaConsumer.html.