I've got a python worker client that spins up a 10 workers which each hook onto a RabbitMQ queue. A bit like this:
#!/usr/bin/python
worker_count=10
def mqworker(queue, configurer):
connection = pika.BlockingConnection(pika.ConnectionParameters(host='mqhost'))
channel = connection.channel()
channel.queue_declare(queue=qname, durable=True)
channel.basic_consume(callback,queue=qname,no_ack=False)
channel.basic_qos(prefetch_count=1)
channel.start_consuming()
def callback(ch, method, properties, body):
doSomeWork();
ch.basic_ack(delivery_tag = method.delivery_tag)
if __name__ == '__main__':
for i in range(worker_count):
worker = multiprocessing.Process(target=mqworker)
worker.start()
The issue I have is that despite setting basic_qos on the channel, the first worker to start accepts all the messages off the queue, whilst the others sit there idle. I can see this in the rabbitmq interface, that even when I set worker_count
to be 1 and dump 50 messages on the queue, all 50 go into the 'unacknowledged' bucket, whereas I'd expect 1 to become unacknowledged and the other 49 to be ready.
Why isn't this working?
I appear to have solved this by moving where basic_qos
is called.
Placing it just after channel = connection.channel()
appears to alter the behaviour to what I'd expect.