RuntimeWarning: coroutine was never awaited. How to async / await a callback

Ruben picture Ruben · Aug 12, 2019 · Viewed 11.8k times · Source

I have a class that serves web sockets, and listen to PostgreSQL. Using asyncpg, when I try to uses add_listener, i get the error: RuntimeWarning: coroutine was never awaited. How to async / await a callback. I tried adding "await self.listener" but it doesn't work.

Is there a way to handle this in another way?

import asyncio
import http
import websockets
import asyncpg

class App(object):

    def __init__(self, loop):
        self.loop = loop
        self.ws_list = []
        self.conn = None

    async def ws_handler(self, ws, path):
        if self.conn is None:
            self.conn = await asyncpg.connect(user='xxx', password='xxx', database='pgws', host='127.0.0.1')
            await self.conn.add_listener('todo_updates', self.listener)
        print('new socket!!!')
        self.ws_list.append(ws)
        while True:
            await asyncio.sleep(1)

    async def listener(self, conn, pid, channel, payload):
        print(payload)
        for ws in self.ws_list:
            task = asyncio.create_task()
            await ws.send(payload)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    app = App(loop)
    start_server = websockets.serve(app.ws_handler, 'localhost', 8766)
    app.loop.run_until_complete(start_server)
    app.loop.run_forever()

Answer

Tobotimus picture Tobotimus · Aug 13, 2019

The problem is that the callback you're passing to asyncpg.Connection.add_listener() is a coroutine function, but it should be a simple synchronous function. asyncpg doesn't raise an error because technically it's still a callable which takes a connection, pid, channel and payload, but it doesn't behave as you expect when it's called.

To call an asynchronous function from within a synchronous callback (whilst the event loop is already running), you need to use something like asyncio.create_task() (in Python >=3.7) or loop.create_task() (in Python >=3.4.2) or asyncio.ensure_future() (in Python >=3.4.4), like so:

class App:
    ...  # Your other code here
    def listener(self, conn, pid, channel, payload):
        print(payload)
        for ws in self.ws_list:
            asyncio.create_task(ws.send(payload))

Be aware that asyncio.create_task() (and the other aforementioned functions) will return straight away, and will not wait for the task to be finished. The task will be scheduled to run after a one or more awaits elsewhere.