How to subscribe to a list of multiple kafka wildcard patterns using kafka-python?

Ben Harrison picture Ben Harrison · Sep 15, 2016 · Viewed 16.6k times · Source

I'm subscribing to Kafka using a pattern with a wildcard, as shown below. The wildcard represents a dynamic customer id.

consumer.subscribe(pattern='customer.*.validations')

This works well, because I can pluck the customer Id from the topic string. But now I need to expand on the functionality to listen to a similar topic for a slightly different purpose. Let's call it customer.*.additional-validations. The code needs to live in the same project because so much functionality is shared, but I need to be able to take a different path based on the type of queue.

In the Kafka documentation I can see that it is possible to subscribe to an array of topics. However these are hard-coded strings. Not patterns that allow for flexibility.

>>> # Deserialize msgpack-encoded values
>>> consumer = KafkaConsumer(value_deserializer=msgpack.loads)
>>> consumer.subscribe(['msgpackfoo'])
>>> for msg in consumer:
...     assert isinstance(msg.value, dict)

So I'm wondering if it is possible to somehow do a combination of the two? Kind of like this (non-working):

consumer.subscribe(pattern=['customer.*.validations', 'customer.*.additional-validations'])

Answer

DhruvPathak picture DhruvPathak · Sep 19, 2016

In the KafkaConsumer code, it supports list of topics, or a pattern,

https://github.com/dpkp/kafka-python/blob/68c8fa4ad01f8fef38708f257cb1c261cfac01ab/kafka/consumer/group.py#L717

   def subscribe(self, topics=(), pattern=None, listener=None):
        """Subscribe to a list of topics, or a topic regex pattern
        Partitions will be dynamically assigned via a group coordinator.
        Topic subscriptions are not incremental: this list will replace the
        current assignment (if there is one).

So you can create a regex, with OR condition using |, that should work as subscribe to multiple dynamic topics regex, as it internally uses re module for matching.

(customer.*.validations)|(customer.*.additional-validations)