Using kafka-python-1.0.2.
If I have a topic with 10 partitions, how do I go about committing a particular partition, while looping through the various partitions and messages. I just cant seem find an example of this anywhere, in the docs or otherwise
From the docs, I want to use:
consumer.commit(offset=offsets)
Specifically, how do I create the partition and OffsetAndMetadata dictionary required for offsets (dict, optional) – {TopicPartition: OffsetAndMetadata}.
I was hoping the function call would just be something like:
consumer.commit(partition, offset)
but this does not seem to be the case.
Thanks in advance.
So it looks like I may have figured it out, funny how that happens when you write down your questions. This seems to work:
meta = consumer.partitions_for_topic(topic)
options = {}
options[partition] = OffsetAndMetadata(message.offset + 1, meta)
consumer.commit(options)
More testing is needed, but will update if anything changes.