I'm facing some serious problems trying to implement a solution for my needs, regarding KafkaConsumer (>=0.9).
Let's imagine I have a function that has to read just n messages from a kafka topic.
For example: getMsgs(5)
--> gets next 5 kafka messages in topic.
So, I have a loop that looks like this. Edited with actual correct parameters. In this case, the consumer's max.poll.records
param was set to 1, so the actual loop only iterated once. Different consumers(some of them iterated through many messages) shared an abstract father (this one), that's why it's coded that way. The numMss
part was ad-hoc for this consumer.
for (boolean exit= false;!exit;)
{
Records = consumer.poll(config.pollTime);
for (Record r:records)
{
processRecord(r); //do my things
numMss++;
if (numMss==maximum) //maximum=5
{
exit=true;
break;
}
}
}
Taking this into account, the problem is that the poll() method could get more than 5 messages. For example, if it gets 10 messages, my code will forget forever those other 5 messages, since Kafka will think they're already consumed.
I tried commiting the offset but doesn't seem to work:
consumer.commitSync(Collections.singletonMap(partition,
new OffsetAndMetadata(record.offset() + 1)));
Even with the offset configuration, whenever I launch again the consumer, it won't start from the 6th message (remember, I just wanted 5 messages), but from the 11th (since the first poll consumed 10 messages).
Is there any solution for this, or maybe (most surely) am I missing something?
Thanks in advance!!
You can set max.poll.records
to whatever number you like such that at most you will get that many records on each poll.
For your use case that you stated in this problem you don't have to commit offsets explicitly by yourself. you can just set enable.auto.commit
to true
and set auto.offset.reset
to earliest
such that it will kick in when there is no consumer group.id
(other words when you are about start reading from a partition for the very first time). Once you have a group.id and some consumer offsets stored in Kafka and in case your Kafka consumer process dies it will continue from the last committed offset since it is the default behavior because when a consumer starts it will first look for if there are any committed offsets and if so, will continue from the last committed offset and auto.offset.reset
won't kick in.