Sending RabbitMQ messages via websockets

bzo picture bzo · Apr 4, 2014 · Viewed 9.2k times · Source

Looking for some code samples to solve this problem :-

Would like to write some code (Python or Javascript) that would act as a subscriber to a RabbitMQ queue so that on receiving a message it would broadcast the message via websockets to any connected client.

I've looked at Autobahn and node.js (using "amqp" and "ws" ) but cannot get things to work as needed. Here's the server code in javascript using node.js:-

var amqp = require('amqp');
var WebSocketServer = require('ws').Server

var connection = amqp.createConnection({host: 'localhost'});
var wss = new WebSocketServer({port:8000});

wss.on('connection',function(ws){

    ws.on('open', function() {
        console.log('connected');
        ws.send(Date.now().toString());
    });

    ws.on('message',function(message){
            console.log('Received: %s',message);
            ws.send(Date.now().toString());
    });
});

connection.on('ready', function(){
    connection.queue('MYQUEUE', {durable:true,autoDelete:false},function(queue){
            console.log(' [*] Waiting for messages. To exit press CTRL+C')
            queue.subscribe(function(msg){
                    console.log(" [x] Received from MYQUEUE %s",msg.data.toString('utf-8'));
                    payload = msg.data.toString('utf-8');
                    // HOW DOES THIS NOW GET SENT VIA WEBSOCKETS ??
            });
    });
});

Using this code, I can successfully subscribe to a queue in Rabbit and receive any messages that are sent to the queue. Similarly, I can connect a websocket client (e.g. a browser) to the server and send/receive messages. BUT ... how can I send the payload of the Rabbit queue message as a websocket message at the point indicated ("HOW DOES THIS NOW GET SENT VIA WEBSOCKETS") ? I think it's something to do with being stuck in the wrong callback or they need to be nested somehow ...?

Alternatively, if this can be done easier in Python (via Autobahn and pika) that would be great.

Thanks !

Answer

Gabriele Santomaggio picture Gabriele Santomaggio · Apr 6, 2014

One way to implement your system is use python with tornado.

Here the server:

    import tornado.ioloop
    import tornado.web
    import tornado.websocket
    import os
    import pika
    from threading import Thread


    clients = []

    def threaded_rmq():
        connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"));
        print 'Connected:localhost'
        channel = connection.channel()
        channel.queue_declare(queue="my_queue")
        print 'Consumer ready, on my_queue'
        channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True) 
        channel.start_consuming()


    def consumer_callback(ch, method, properties, body):
            print " [x] Received %r" % (body,)
            for itm in clients:
                itm.write_message(body)

    class SocketHandler(tornado.websocket.WebSocketHandler):
        def open(self):
            print "WebSocket opened"
            clients.append(self)
        def on_message(self, message):
            self.write_message(u"You said: " + message)

        def on_close(self):
            print "WebSocket closed"
            clients.remove(self)


    class MainHandler(tornado.web.RequestHandler):
        def get(self):
            print "get page"
            self.render("websocket.html")


application = tornado.web.Application([
    (r'/ws', SocketHandler),
    (r"/", MainHandler),
])

if __name__ == "__main__":
    thread = Thread(target = threaded_rmq)
    thread.start()

    application.listen(8889)
    tornado.ioloop.IOLoop.instance().start()

and here the html page:

<html>
<head>
    <script src="//code.jquery.com/jquery-1.11.0.min.js"></script>
    <script>

    $(document).ready(function() {
      var ws;
       if ('WebSocket' in window) {
           ws = new WebSocket('ws://localhost:8889/ws');
        }
        else if ('MozWebSocket' in window) {
            ws = new MozWebSocket('ws://localhost:8889/ws');
        }
        else {

              alert("<tr><td> your browser doesn't support web socket </td></tr>");

            return;
        }

      ws.onopen = function(evt) { alert("Connection open ...")};

      ws.onmessage = function(evt){
        alert(evt.data);
      };

      function closeConnect(){
          ws.close();
      }


  });
    </script>

</head>

<html>

So when you publish a message to "my_queue" the message is redirects to all web page connected.

I hope it can be useful

EDIT**

Here https://github.com/Gsantomaggio/rabbitmqexample you can find the complete example