How to access Kafka headers while consuming a message?

iamiddy picture iamiddy · Jul 16, 2015 · Viewed 10.6k times · Source

Below are my configuration

<int-kafka:inbound-channel-adapter id="kafkaInboundChannelAdapter"
            kafka-consumer-context-ref="consumerContext"
            auto-startup="true"
            channel="inputFromKafka">
        <int:poller fixed-delay="1" time-unit="MILLISECONDS" />
    </int-kafka:inbound-channel-adapter>

inputFromKafka goes through transformation below

public Message<?> transform(final Message<?> message) {

System.out.println( "KAFKA Message Headers " + message.getHeaders());

final Map<String, Map<Integer, List<Object>>> origData =  (Map<String, Map<Integer, List<Object>>>) message.getPayload();

        // some code to figure-out the nonPartitionedData
        return MessageBuilder.withPayload(nonPartitionedData).build();
    }

The print statement from above prints only two consistent headers regardless

KAFKA Message Headers {id=9c8f09e6-4b28-5aa1-c74c-ebfa53c01ae4, timestamp=1437066957272}

While Sending a Kafka message some headers were passed including KafkaHeaders.MESSAGE_KEY but I am not getting back that either, wondering if there is away to accomplish this?

Answer

Artem Bilan picture Artem Bilan · Jul 16, 2015

Unfortunately it doesn't work that way...

The Producer part (KafkaProducerMessageHandler) looks like this:

this.kafkaProducerContext.send(topic, partitionId, messageKey, message.getPayload());

As you see we don't send any messageHeaders to the Kafka topic. Only payload and exactly under that messageKey as it specified by Kafka protocol.

From other side the Consumer side (KafkaHighLevelConsumerMessageSource) does this logic:

if (!payloadMap.containsKey(messageAndMetadata.partition())) {
    final List<Object> payload = new ArrayList<Object>();
    payload.add(messageAndMetadata.message());
    payloadMap.put(messageAndMetadata.partition(), payload);
}

As you see we don't care here about messageKey.

The KafkaMessageDrivenChannelAdapter (<int-kafka:message-driven-channel-adapter>) is for you! It does this before sending the message to the channel:

KafkaMessageHeaders kafkaMessageHeaders = new KafkaMessageHeaders(generateMessageId, generateTimestamp);

Map<String, Object> rawHeaders = kafkaMessageHeaders.getRawHeaders();
rawHeaders.put(KafkaHeaders.MESSAGE_KEY, key);
rawHeaders.put(KafkaHeaders.TOPIC, metadata.getPartition().getTopic());
rawHeaders.put(KafkaHeaders.PARTITION_ID, metadata.getPartition().getId());
rawHeaders.put(KafkaHeaders.OFFSET, metadata.getOffset());
rawHeaders.put(KafkaHeaders.NEXT_OFFSET, metadata.getNextOffset());

if (!this.autoCommitOffset) {
    rawHeaders.put(KafkaHeaders.ACKNOWLEDGMENT, acknowledgment);
}