I need to get the message produced in Kafka hour by hour in a day. Every one hour I will launch a job to consume the message produced 1 hour ago. e.g., if current time is 20:12, I will consume the message between 19:00:00 and 19:59:59. That means I need to get start offset by time 19:00:00 and end offset by time 19:59:59. I used SimpleConsumer.getOffsetsBefore as shown in 「0.8.0 SimpleConsumer Example」. The problem is the returning offset does not match the timestamp given as a parameter. e.g. When make timestamp 19:00:00, I get the message produced at time 16:38:00.
Below kafka consumer api method getOffsetsByTimes()
can be used for this , it is available from 0.10.0 version or higher. See JavaDoc.
/**
* Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
* earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
*
* This is a blocking call. The consumer does not have to be assigned the partitions.
* If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
* will be returned for that partition.
*
* Notice that this method may block indefinitely if the partition does not exist.
*
* @param timestampsToSearch the mapping from partition to the timestamp to look up.
* @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
* than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
* such message.
* @throws IllegalArgumentException if the target timestamp is negative.
*/
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
// we explicitly exclude the earliest and latest offset here so the timestamp in the returned
// OffsetAndTimestamp is always positive.
if (entry.getValue() < 0)
throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
entry.getValue() + ". The target time cannot be negative.");
}
return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}