pika, stop_consuming does not work

laike9m picture laike9m · Dec 23, 2014 · Viewed 7.6k times · Source

I'm new to rabbitmq and pika, and is having trouble with stopping consuming.

channel and queue setting:

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue=new_task_id, durable=True, auto_delete=True)

Basically, consumer and producer are like this:

consumer:

def task(task_id):
    def callback(channel, method, properties, body):
        if body != "quit":
            print(body)
        else:
            print(body)
            channel.stop_consuming(task_id)

    channel.basic_consume(callback, queue=task_id, no_ack=True)
    channel.start_consuming()
    print("finish")
    return "finish"

producer:

proc = Popen(['app/sample.sh'], shell=True, stdout=PIPE)
while proc.returncode is None:  # running
    line = proc.stdout.readline()
    if line:
        channel.basic_publish(
            exchange='',
            routing_key=self.request.id,
            body=line
        )
    else:
        channel.basic_publish(
            exchange='',
            routing_key=self.request.id,
            body="quit"
        )
        break

consumer task gave me output:

# ... output from sample.sh, as expected

quit
�}q(UstatusqUSUCCESSqU  tracebackqNUresultqNUtask_idqU
1419350416qUchildrenq]u.

However, "finish" didn't get printed, so I'm guessing it's because channel.stop_consuming(task_id) didn't stop consuming. If so, what is the correct way to do? Thank you.

Answer

shx2 picture shx2 · Feb 14, 2016

I had the same problem. It seems to be caused by the fact that internally, start_consuming calls self.connection.process_data_events(time_limit=None). This time_limit=None makes it hang.

I managed to workaround this problem by replacing the call to channel.start_consuming() with its implemenation, hacked:

while channel._consumer_infos:
    channel.connection.process_data_events(time_limit=1) # 1 second