Kafka Consumer poll messages with python

soa picture soa · Sep 10, 2017 · Viewed 10.3k times · Source

I have problems with polling messages from Kafka in a Consumer Group. My Consumer Object assigns to a given partition with

self.ps = TopicPartition(topic, partition )

and after that the consumer assigns to that Partition:

self.consumer.assign([self.ps])

After that I am able to count the messages inside the partition with

self.consumer.seek_to_beginning(self.ps)
pos = self.consumer.position(self.ps)

and self.consumer.seek_to_end(self.ps) .....

In my topic are over 30000 messages. The problem is that I am only get exactly one message.

Consumer Configuration with: max_poll_records= 200 AUTO_OFFSET_RESET is earliest

And here is my function with this I am trying to get the messages:

 def poll_messages(self):


    data = []

    messages = self.consumer.poll(timeout_ms=6000)


    for partition, msgs in six.iteritems(messages):

        for msg in msgs:

            data.append(msg)

    return data

Even if I go to the first available offset before start polling the messages I get only one message.

self.consumer.seek(self.ps, self.get_first_offset())

I hope someone can explain me what I am doing wrong. Thanks in advance.

Best wishes Jörn

Answer

Nick picture Nick · Nov 6, 2017

I believe that you are misunderstanding max_poll_records - this doesn't mean you will get 200 per poll, just a limit on the most you might get. You will need to call poll multiple times. I'd refer you to the docs for simple examples: http://kafka-python.readthedocs.io/en/master/usage.html

I believe a more standard implementation is:

for message in self.consumer:
  # do stuff like:
  print(msg)