Python and RabbitMQ - Best way to listen to consume events from multiple channels?

blindsnowmobile picture blindsnowmobile · Feb 16, 2015 · Viewed 26.6k times · Source

I have two, separate RabbitMQ instances. I'm trying to find the best way to listen to events from both.

For example, I can consume events on one with the following:

credentials = pika.PlainCredentials(user, pass)
connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials))
channel = connection.channel()
result = channel.queue_declare(Exclusive=True)
self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*')
channel.basic_consume(callback_func, result.method.queue, no_ack=True)
self.channel.start_consuming()

I have a second host, "host2", that I'd like to listen to as well. I thought about creating two separate threads to do this, but from what I've read, pika isn't thread safe. Is there a better way? Or would creating two separate threads, each listening to a different Rabbit instance (host1, and host2) be sufficient?

Answer

Unit03 picture Unit03 · Feb 17, 2015

The answer to "what is the best way" depends heavily on your usage pattern of queues and what you mean by "best". Since I can't comment on questions yet, I'll just try to suggest some possible solutions.

In each example I'm going to assume exchange is already declared.

Threads

You can consume messages from two queues on separate hosts in single process using pika.

You are right - as its own FAQ states, pika is not thread safe, but it can be used in multi-threaded manner by creating connections to RabbitMQ hosts per thread. Making this example run in threads using threading module looks as follows:

import pika
import threading


class ConsumerThread(threading.Thread):
    def __init__(self, host, *args, **kwargs):
        super(ConsumerThread, self).__init__(*args, **kwargs)

        self._host = host

    # Not necessarily a method.
    def callback_func(self, channel, method, properties, body):
        print("{} received '{}'".format(self.name, body))

    def run(self):
        credentials = pika.PlainCredentials("guest", "guest")

        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=self._host,
                                      credentials=credentials))

        channel = connection.channel()

        result = channel.queue_declare(exclusive=True)

        channel.queue_bind(result.method.queue,
                           exchange="my-exchange",
                           routing_key="*.*.*.*.*")

        channel.basic_consume(self.callback_func,
                              result.method.queue,
                              no_ack=True)

        channel.start_consuming()


if __name__ == "__main__":
    threads = [ConsumerThread("host1"), ConsumerThread("host2")]
    for thread in threads:
        thread.start()

I've declared callback_func as a method purely to use ConsumerThread.name while printing message body. It might as well be a function outside the ConsumerThread class.

Processes

Alternatively, you can always just run one process with consumer code per queue you want to consume events.

import pika
import sys


def callback_func(channel, method, properties, body):
    print(body)


if __name__ == "__main__":
    credentials = pika.PlainCredentials("guest", "guest")

    connection = pika.BlockingConnection(
        pika.ConnectionParameters(host=sys.argv[1],
                                  credentials=credentials))

    channel = connection.channel()

    result = channel.queue_declare(exclusive=True)

    channel.queue_bind(result.method.queue,
                       exchange="my-exchange",
                       routing_key="*.*.*.*.*")

    channel.basic_consume(callback_func, result.method.queue, no_ack=True)

    channel.start_consuming()

and then run by:

$ python single_consume.py host1
$ python single_consume.py host2  # e.g. on another console

If the work you're doing on messages from queues is CPU-heavy and as long as number of cores in your CPU >= number of consumers, it is generally better to use this approach - unless your queues are empty most of the time and consumers won't utilize this CPU time*.

Async

Another alternative is to involve some asynchronous framework (for example Twisted) and running whole thing in single thread.

You can no longer use BlockingConnection in asynchronous code; fortunately, pika has adapter for Twisted:

from pika.adapters.twisted_connection import TwistedProtocolConnection
from pika.connection import ConnectionParameters
from twisted.internet import protocol, reactor, task
from twisted.python import log


class Consumer(object):
    def on_connected(self, connection):
        d = connection.channel()
        d.addCallback(self.got_channel)
        d.addCallback(self.queue_declared)
        d.addCallback(self.queue_bound)
        d.addCallback(self.handle_deliveries)
        d.addErrback(log.err)

    def got_channel(self, channel):
        self.channel = channel

        return self.channel.queue_declare(exclusive=True)

    def queue_declared(self, queue):
        self._queue_name = queue.method.queue

        self.channel.queue_bind(queue=self._queue_name,
                                exchange="my-exchange",
                                routing_key="*.*.*.*.*")

    def queue_bound(self, ignored):
        return self.channel.basic_consume(queue=self._queue_name)

    def handle_deliveries(self, queue_and_consumer_tag):
        queue, consumer_tag = queue_and_consumer_tag
        self.looping_call = task.LoopingCall(self.consume_from_queue, queue)

        return self.looping_call.start(0)

    def consume_from_queue(self, queue):
        d = queue.get()

        return d.addCallback(lambda result: self.handle_payload(*result))

    def handle_payload(self, channel, method, properties, body):
        print(body)


if __name__ == "__main__":
    consumer1 = Consumer()
    consumer2 = Consumer()

    parameters = ConnectionParameters()
    cc = protocol.ClientCreator(reactor,
                                TwistedProtocolConnection,
                                parameters)
    d1 = cc.connectTCP("host1", 5672)
    d1.addCallback(lambda protocol: protocol.ready)
    d1.addCallback(consumer1.on_connected)
    d1.addErrback(log.err)

    d2 = cc.connectTCP("host2", 5672)
    d2.addCallback(lambda protocol: protocol.ready)
    d2.addCallback(consumer2.on_connected)
    d2.addErrback(log.err)

    reactor.run()

This approach would be even better, the more queues you would consume from and the less CPU-bound the work performing by consumers is*.

Python 3

Since you've mentioned pika, I've restricted myself to Python 2.x-based solutions, because pika is not yet ported.

But in case you would want to move to >=3.3, one possible option is to use asyncio with one of AMQP protocol (the protocol you speak in with RabbitMQ) , e.g. asynqp or aioamqp.

* - please note that these are very shallow tips - in most cases choice is not that obvious; what will be the best for you depends on queues "saturation" (messages/time), what work do you do upon receiving these messages, what environment you run your consumers in etc.; there's no way to be sure other than to benchmark all implementations