I'm trying to send a python dictionary from a python producer to a python consumer using RabbitMQ. The producer first establishes the connection to local RabbitMQ server. Then it creates a queue to which the message will be delivered, and finally sends the message. The consumer first connects to RabbitMQ server and then makes sure the queue exists by creating the same queue. It then receives the message from producer within the callback function, and prints the 'id' value (1). Here are the scripts for producer and consumer:
producer.py script:
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = {'id': 1, 'name': 'name1'}
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
print(" [x] Sent %r" % message)
connection.close()
consumer.py script:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
print(body['id'])
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
queue='task_queue')
channel.start_consuming()
But, when I run the producer.py, I get this error:
line 18, in <module>
delivery_mode = 2, # make message persistent
File "/Library/Python/2.7/site-packages/pika/adapters/blocking_connection.py", line 1978, in basic_publish
mandatory, immediate)
File "/Library/Python/2.7/site-packages/pika/adapters/blocking_connection.py", line 2064, in publish
immediate=immediate)
File "/Library/Python/2.7/site-packages/pika/channel.py", line 338, in basic_publish
(properties, body))
File "/Library/Python/2.7/site-packages/pika/channel.py", line 1150, in _send_method
self.connection._send_method(self.channel_number, method_frame, content)
File "/Library/Python/2.7/site-packages/pika/connection.py", line 1571, in _send_method
self._send_message(channel_number, method_frame, content)
File "/Library/Python/2.7/site-packages/pika/connection.py", line 1596, in _send_message
content[1][s:e]).marshal())
TypeError: unhashable type
Could anybody help me? Thanks!
You can't send native Python types as your payload, you have to serialize them first. I recommend using JSON:
import json
channel.basic_publish(exchange='',
routing_key='task_queue',
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))
and
def callback(ch, method, properties, body):
print(" [x] Received %r" % json.loads(body))