how to fetch a field in ConsumerRecord

zhenghuagui picture zhenghuagui · Jul 7, 2016 · Viewed 7.1k times · Source

I wrote a python script:

#!/usr/bin/env python
from kafka import KafkaConsumer
consumer = KafkaConsumer('dimon_tcpdump',group_id='zhg_group',bootstrap_servers='192.168.100.9:9092')
for msg in consumer:
    print msg
    # process mes here

the msg output is like:

ConsumerRecord(topic=u'ditopic', partition=0, offset=6280, timestamp=None, timestamp_type=None, key=None, value='myvalue')

I know the output is a namedtuple form.

My problem is: how can I get a specific field of the ConsumerRecord? For example, I want to assign the value string to a variable.

Answer

Dominic Cabral picture Dominic Cabral · Jan 31, 2018

It might have to do with how your deserializing the data. For example, if you wanted to grab some JSON from the msg. You would initialize the Consumer with:

value_deserializer=lambda m: json.loads(m.decode('utf-8'))

So your code would look something like this:

#!/usr/bin/env python
from kafka import KafkaConsumer
consumer = KafkaConsumer(
   'dimon_tcpdump',
    group_id='zhg_group',
    value_deserializer=lambda m: json.loads(m.decode('utf-8')),
    bootstrap_servers='192.168.100.9:9092'
    )
for msg in consumer:
    print msg.value
    # process mes here