In Kafka how to get the exact offset according producing time

Po Zhou picture Po Zhou · Mar 21, 2014 · Viewed 24.2k times · Source

I need to get the message produced in Kafka hour by hour in a day. Every one hour I will launch a job to consume the message produced 1 hour ago. e.g., if current time is 20:12, I will consume the message between 19:00:00 and 19:59:59. That means I need to get start offset by time 19:00:00 and end offset by time 19:59:59. I used SimpleConsumer.getOffsetsBefore as shown in 「0.8.0 SimpleConsumer Example」. The problem is the returning offset does not match the timestamp given as a parameter. e.g. When make timestamp 19:00:00, I get the message produced at time 16:38:00.

Answer

Liju John picture Liju John · Jul 20, 2017

Below kafka consumer api method getOffsetsByTimes() can be used for this , it is available from 0.10.0 version or higher. See JavaDoc.

/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the
 * earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition.
 *
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 * If the message format version in a partition is before 0.10.0, i.e. the messages do not have timestamps, null
 * will be returned for that partition.
 *
 * Notice that this method may block indefinitely if the partition does not exist.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return a mapping from partition to the timestamp and offset of the first message with timestamp greater
 *         than or equal to the target timestamp. {@code null} will be returned for the partition if there is no
 *         such message.
 * @throws IllegalArgumentException if the target timestamp is negative.
 */
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
    for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
        // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
        // OffsetAndTimestamp is always positive.
        if (entry.getValue() < 0)
            throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
                    entry.getValue() + ". The target time cannot be negative.");
    }
    return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}