I have problems with polling messages from Kafka in a Consumer Group. My Consumer Object assigns to a given partition with
self.ps = TopicPartition(topic, partition )
and after that the consumer assigns to that Partition:
self.consumer.assign([self.ps])
After that I am able to count the messages inside the partition with
self.consumer.seek_to_beginning(self.ps)
pos = self.consumer.position(self.ps)
and self.consumer.seek_to_end(self.ps)
.....
In my topic are over 30000 messages. The problem is that I am only get exactly one message.
Consumer Configuration with:
max_poll_records= 200
AUTO_OFFSET_RESET
is earliest
And here is my function with this I am trying to get the messages:
def poll_messages(self):
data = []
messages = self.consumer.poll(timeout_ms=6000)
for partition, msgs in six.iteritems(messages):
for msg in msgs:
data.append(msg)
return data
Even if I go to the first available offset before start polling the messages I get only one message.
self.consumer.seek(self.ps, self.get_first_offset())
I hope someone can explain me what I am doing wrong. Thanks in advance.
Best wishes Jörn
I believe that you are misunderstanding max_poll_records - this doesn't mean you will get 200 per poll, just a limit on the most you might get. You will need to call poll multiple times. I'd refer you to the docs for simple examples: http://kafka-python.readthedocs.io/en/master/usage.html
I believe a more standard implementation is:
for message in self.consumer:
# do stuff like:
print(msg)