RabbitMQ: How to send Python dictionary between Python producer and consumer?

Malgi picture Malgi · Dec 30, 2015 · Viewed 16.9k times · Source

I'm trying to send a python dictionary from a python producer to a python consumer using RabbitMQ. The producer first establishes the connection to local RabbitMQ server. Then it creates a queue to which the message will be delivered, and finally sends the message. The consumer first connects to RabbitMQ server and then makes sure the queue exists by creating the same queue. It then receives the message from producer within the callback function, and prints the 'id' value (1). Here are the scripts for producer and consumer:

producer.py script:

import pika
import sys

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)

message = {'id': 1, 'name': 'name1'}
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=message,
                      properties=pika.BasicProperties(
                         delivery_mode = 2, # make message persistent
                      ))
print(" [x] Sent %r" % message)
connection.close()

consumer.py script:

import pika
import time

connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()

channel.queue_declare(queue='task_queue', durable=True)
print(' [*] Waiting for messages. To exit press CTRL+C')

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    print(body['id'])
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback,
                      queue='task_queue')

channel.start_consuming()

But, when I run the producer.py, I get this error:

line 18, in <module>
    delivery_mode = 2, # make message persistent
  File "/Library/Python/2.7/site-packages/pika/adapters/blocking_connection.py", line 1978, in basic_publish
    mandatory, immediate)
  File "/Library/Python/2.7/site-packages/pika/adapters/blocking_connection.py", line 2064, in publish
    immediate=immediate)
  File "/Library/Python/2.7/site-packages/pika/channel.py", line 338, in basic_publish
    (properties, body))
  File "/Library/Python/2.7/site-packages/pika/channel.py", line 1150, in _send_method
    self.connection._send_method(self.channel_number, method_frame, content)
  File "/Library/Python/2.7/site-packages/pika/connection.py", line 1571, in _send_method
    self._send_message(channel_number, method_frame, content)
  File "/Library/Python/2.7/site-packages/pika/connection.py", line 1596, in _send_message
    content[1][s:e]).marshal())
TypeError: unhashable type

Could anybody help me? Thanks!

Answer

Turn picture Turn · Dec 30, 2015

You can't send native Python types as your payload, you have to serialize them first. I recommend using JSON:

import json
channel.basic_publish(exchange='',
                      routing_key='task_queue',
                      body=json.dumps(message),
                      properties=pika.BasicProperties(
                          delivery_mode = 2, # make message persistent
                      ))

and

def callback(ch, method, properties, body):
print(" [x] Received %r" % json.loads(body))