I'm writing a kafka
consumer using Java. I want to keep the real time of the message, so if there are too many messages waiting for consuming, such as 1000 or more, I should abandon the unconsumed messages and start consuming from the last offset.
For this problem, I try to compare the last committed offset and the end offset of a topic(only 1 partition), if the difference between these two offsets is larger than a certain amount, I will set the last committed offset of the topic as next offset so that I can abandon those redundant messages.
Now my problem is how to get the end offset of a topic, some people say I can use old consumer, but it's too complicated, do new consumer has this function?
The new consumer is also complicated.
//assign the topic
consumer.assign();
//seek to end of the topic
consumer.seekToEnd();
//the position is the latest offset
consumer.position();