Asyncio + aiohttp - redis Pub/Sub and websocket read/write in single handler

Michael picture Michael · Jul 28, 2015 · Viewed 8.6k times · Source

I'm currently playing with aiohttp to see how it will perform as a server application for mobile app with websocket connection.

Here is simple "Hello world" example (as gist here):

import asyncio
import aiohttp
from aiohttp import web


class WebsocketEchoHandler:

    @asyncio.coroutine
    def __call__(self, request):
        ws = web.WebSocketResponse()
        ws.start(request)

        print('Connection opened')
        try:
            while True:
                msg = yield from ws.receive()
                ws.send_str(msg.data + '/answer')
        except:
            pass
        finally:
            print('Connection closed')
        return ws


if __name__ == "__main__":
    app = aiohttp.web.Application()
    app.router.add_route('GET', '/ws', WebsocketEchoHandler())

    loop = asyncio.get_event_loop()
    handler = app.make_handler()

    f = loop.create_server(
        handler,
        '127.0.0.1',
        8080,
    )

    srv = loop.run_until_complete(f)
    print("Server started at {sock[0]}:{sock[1]}".format(
        sock=srv.sockets[0].getsockname()
    ))
    try:
        loop.run_forever()
    except KeyboardInterrupt:
        pass
    finally:
        loop.run_until_complete(handler.finish_connections(1.0))
        srv.close()
        loop.run_until_complete(srv.wait_closed())
        loop.run_until_complete(app.finish())
    loop.close()

The problem

Now I would like to use structure described below (node server = python aiohttp). To be more specific, use Redis Pub/Sub mechanism with asyncio-redis to read and write both to websocket connection and Redis in my WebsocketEchoHandler.

WebsocketEchoHandler is a dead simple loop so I'm not sure how should this be done. Using Tornado and brükva I would just use callbacks.

http://goldfirestudios.com/blog/136/Horizontally-Scaling-Node.js-and-WebSockets-with-Redis

Extra (offtopic perhaps) question

Since I'm using Redis already, which of two approaches should I take:

  1. Like in "classic" web app, have a controller/view for everything, use Redis just for messaging etc.
  2. Web app should be just a layer between client and Redis used also as task queue (simplest Python RQ). Every request should be delegated to workers.

EDIT

Image from http://goldfirestudios.com/blog/136/Horizontally-Scaling-Node.js-and-WebSockets-with-Redis

EDIT 2

It seems that I need to clarify.

  • Websocket-only handler is shown above
  • Redis Pub/Sub handler might look like that:

    class WebsocketEchoHandler:
    
        @asyncio.coroutine
        def __call__(self, request):
            ws = web.WebSocketResponse()
            ws.start(request)
    
            connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
            subscriber = yield from connection.start_subscribe()
            yield from subscriber.subscribe(['ch1', 'ch2'])
    
            print('Connection opened')
            try:
                while True:
                    msg = yield from subscriber.next_published()
                    ws.send_str(msg.value + '/answer')
            except:
                pass
            finally:
                print('Connection closed')
            return ws
    

    This handler just subscribes to Redis channel ch1 and ch2 and sends every received message from those channels to websocket.

  • I want to have this handler:

    class WebsocketEchoHandler:
    
        @asyncio.coroutine
        def __call__(self, request):
            ws = web.WebSocketResponse()
            ws.start(request)
    
            connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
            subscriber = yield from connection.start_subscribe()
            yield from subscriber.subscribe(['ch1', 'ch2'])
    
            print('Connection opened')
            try:
                while True:
                    # If message recived from redis OR from websocket
                    msg_ws = yield from ws.receive()
                    msg_redis = yield from subscriber.next_published()
                    if msg_ws:
                        # push to redis / do something else
                        self.on_msg_from_ws(msg_ws)
                    if msg_redis:
                        self.on_msg_from_redis(msg_redis)
            except:
                pass
            finally:
                print('Connection closed')
            return ws
    

    But following code is always called sequentially so reading from websocket blocks reading from Redis:

    msg_ws = yield from ws.receive()
    msg_redis = yield from subscriber.next_published()
    

I want reading to be done on event where event is message received from one of two sources.

Answer

dano picture dano · Jul 28, 2015

You should use two while loops - one that handles messages from the websocket, and one that handles messages from redis. Your main handler can just kick off two coroutines, one handling each loop, and then wait on both of them:

class WebsocketEchoHandler:
    @asyncio.coroutine
    def __call__(self, request):
        ws = web.WebSocketResponse()
        ws.start(request)

        connection = yield from asyncio_redis.Connection.create(host='127.0.0.1', port=6379)
        subscriber = yield from connection.start_subscribe()
        yield from subscriber.subscribe(['ch1', 'ch2'])

        print('Connection opened')
        try:
            # Kick off both coroutines in parallel, and then block
            # until both are completed.
            yield from asyncio.gather(self.handle_ws(ws), self.handle_redis(subscriber))
        except Exception as e:  # Don't do except: pass
            import traceback
            traceback.print_exc()
        finally:
            print('Connection closed')
        return ws

    @asyncio.coroutine
    def handle_ws(self, ws):
        while True:
            msg_ws = yield from ws.receive()
            if msg_ws:
                self.on_msg_from_ws(msg_ws)

    @asyncio.coroutine
    def handle_redis(self, subscriber):
        while True:
            msg_redis = yield from subscriber.next_published()
            if msg_redis:
                self.on_msg_from_redis(msg_redis)

This way you can read from any of the two potential sources without having to care about the other.