I am using "hello world" tutorial in :http://www.rabbitmq.com/tutorials/tutorial-two-python.html .
worker.py
looks like this
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print ' [*] Waiting for messages. To exit press CTRL+C'
def callback(ch, method, properties, body):
print " [x] Received %r" % (body,)
time.sleep( body.count('.') )
print " [x] Done"
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
I have used this code to implement in my work. Everything works smoothly untill there comes a point in a queue for which it raises an exception after printing [x] Done
Traceback (most recent call last):
File "hullworker2.py", line 242, in <module>
channel.basic_consume(callback,queue='test_queue2')
File "/usr/local/lib/python2.7/dist-packages/pika/channel.py", line 211, in basic_consume
{'consumer_tag': consumer_tag})])
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 904, in _rpc
self.connection.process_data_events()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 88, in process_data_events
if self._handle_read():
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 184, in _handle_read
super(BlockingConnection, self)._handle_read()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 300, in _handle_read
return self._handle_error(error)
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/base_connection.py", line 264, in _handle_error
self._handle_disconnect()
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 181, in _handle_disconnect
self._on_connection_closed(None, True)
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 232, in _on_connection_closed
self._channels[channel]._on_close(method_frame)
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 817, in _on_close
self._send_method(spec.Channel.CloseOk(), None, False)
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 920, in _send_method
self.connection.send_method(self.channel_number, method_frame, content)
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 120, in send_method
self._send_method(channel_number, method_frame, content)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1331, in _send_method
self._send_frame(frame.Method(channel_number, method_frame))
File "/usr/local/lib/python2.7/dist-packages/pika/adapters/blocking_connection.py", line 245, in _send_frame
super(BlockingConnection, self)._send_frame(frame_value)
File "/usr/local/lib/python2.7/dist-packages/pika/connection.py", line 1312, in _send_frame
raise exceptions.ConnectionClosed
pika.exceptions.ConnectionClosed
I don't understand how the connection is closing automatically in between the process. Process runs fine for 100's of messages in the queue then suddenly this error comes up. Any help appreciated.
There is a concept of heartbeats
. It's basically a way how the server can make sure that the client is still connected.
when you do
time.sleep( body.count('.') )
You blocking the code by N
number of seconds. It means that if server would like to send a heartbeat
frame to check if your client is still alive, then it will not get a response back, because your code is blocked and doesn't know if heartbeat arrived.
Instead of using time.sleep()
you should use connection.sleep()
this will also make the code "sleep" for N
number of seconds, but it will also communicate with the server and will respond back.
sleep(duration)[source]
A safer way to sleep than calling time.sleep() directly which will keep the adapter from ignoring frames sent from RabbitMQ. The connection will “sleep” or block the number of seconds specified in duration in small intervals.