How does asyncio actually work?

wvxvw picture wvxvw · Feb 27, 2018 · Viewed 24k times · Source

This question is motivated by my another question: How to await in cdef?

There are tons of articles and blog posts on the web about asyncio, but they are all very superficial. I couldn't find any information about how asyncio is actually implemented, and what makes I/O asynchronous. I was trying to read the source code, but it's thousands of lines of not the highest grade C code, a lot of which deals with auxiliary objects, but most crucially, it is hard to connect between Python syntax and what C code it would translate into.

Asycnio's own documentation is even less helpful. There's no information there about how it works, only some guidelines about how to use it, which are also sometimes misleading / very poorly written.

I'm familiar with Go's implementation of coroutines, and was kind of hoping that Python did the same thing. If that was the case, the code I came up in the post linked above would have worked. Since it didn't, I'm now trying to figure out why. My best guess so far is as follows, please correct me where I'm wrong:

  1. Procedure definitions of the form async def foo(): ... are actually interpreted as methods of a class inheriting coroutine.
  2. Perhaps, async def is actually split into multiple methods by await statements, where the object, on which these methods are called is able to keep track of the progress it made through the execution so far.
  3. If the above is true, then, essentially, execution of a coroutine boils down to calling methods of coroutine object by some global manager (loop?).
  4. The global manager is somehow (how?) aware of when I/O operations are performed by Python (only?) code and is able to choose one of the pending coroutine methods to execute after the current executing method relinquished control (hit on the await statement).

In other words, here's my attempt at "desugaring" of some asyncio syntax into something more understandable:

async def coro(name):
    print('before', name)
    await asyncio.sleep()
    print('after', name)

asyncio.gather(coro('first'), coro('second'))

# translated from async def coro(name)
class Coro(coroutine):
    def before(self, name):
        print('before', name)

    def after(self, name):
        print('after', name)

    def __init__(self, name):
        self.name = name
        self.parts = self.before, self.after
        self.pos = 0

    def __call__():
        self.parts[self.pos](self.name)
        self.pos += 1

    def done(self):
        return self.pos == len(self.parts)


# translated from asyncio.gather()
class AsyncIOManager:

    def gather(*coros):
        while not every(c.done() for c in coros):
            coro = random.choice(coros)
            coro()

Should my guess prove correct: then I have a problem. How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter? What exactly is meant by I/O? If my python procedure called C open() procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?

Answer

Bharel picture Bharel · Jun 30, 2018

How does asyncio work?

Before answering this question we need to understand a few base terms, skip these if you already know any of them.

Generators

Generators are objects that allow us to suspend the execution of a python function. User curated generators are implement using the keyword yield. By creating a normal function containing the yield keyword, we turn that function into a generator:

>>> def test():
...     yield 1
...     yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
StopIteration

As you can see, calling next() on the generator causes the interpreter to load test's frame, and return the yielded value. Calling next() again, cause the frame to load again into the interpreter stack, and continue on yielding another value.

By the third time next() is called, our generator was finished, and StopIteration was thrown.

Communicating with a generator

A less-known feature of generators, is the fact that you can communicate with them using two methods: send() and throw().

>>> def test():
...     val = yield 1
...     print(val)
...     yield 2
...     yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in test
Exception

Upon calling gen.send(), the value is passed as a return value from the yield keyword.

gen.throw() on the other hand, allows throwing Exceptions inside generators, with the exception raised at the same spot yield was called.

Returning values from generators

Returning a value from a generator, results in the value being put inside the StopIteration exception. We can later on recover the value from the exception and use it to our need.

>>> def test():
...     yield 1
...     return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
...     next(gen)
... except StopIteration as exc:
...     print(exc.value)
...
abc

Behold, a new keyword: yield from

Python 3.4 came with the addition of a new keyword: yield from. What that keyword allows us to do, is pass on any next(), send() and throw() into an inner-most nested generator. If the inner generator returns a value, it is also the return value of yield from:

>>> def inner():
...     inner_result = yield 2
...     print('inner', inner_result)
...     return 3
...
>>> def outer():
...     yield 1
...     val = yield from inner()
...     print('outer', val)
...     yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4

I've written an article to further elaborate on this topic.

Putting it all together

Upon introducing the new keyword yield from in Python 3.4, we were now able to create generators inside generators that just like a tunnel, pass the data back and forth from the inner-most to the outer-most generators. This has spawned a new meaning for generators - coroutines.

Coroutines are functions that can be stopped and resumed while being run. In Python, they are defined using the async def keyword. Much like generators, they too use their own form of yield from which is await. Before async and await were introduced in Python 3.5, we created coroutines in the exact same way generators were created (with yield from instead of await).

async def inner():
    return 1

async def outer():
    await inner()

Like every iterator or generator that implement the __iter__() method, coroutines implement __await__() which allows them to continue on every time await coro is called.

There's a nice sequence diagram inside the Python docs that you should check out.

In asyncio, apart from coroutine functions, we have 2 important objects: tasks and futures.

Futures

Futures are objects that have the __await__() method implemented, and their job is to hold a certain state and result. The state can be one of the following:

  1. PENDING - future does not have any result or exception set.
  2. CANCELLED - future was cancelled using fut.cancel()
  3. FINISHED - future was finished, either by a result set using fut.set_result() or by an exception set using fut.set_exception()

The result, just like you have guessed, can either be a Python object, that will be returned, or an exception which may be raised.

Another important feature of future objects, is that they contain a method called add_done_callback(). This method allows functions to be called as soon as the task is done - whether it raised an exception or finished.

Tasks

Task objects are special futures, which wrap around coroutines, and communicate with the inner-most and outer-most coroutines. Every time a coroutine awaits a future, the future is passed all the way back to the task (just like in yield from), and the task receives it.

Next, the task binds itself to the future. It does so by calling add_done_callback() on the future. From now on, if the future will ever be done, by either being cancelled, passed an exception or passed a Python object as a result, the task's callback will be called, and it will rise back up to existence.

Asyncio

The final burning question we must answer is - how is the IO implemented?

Deep inside asyncio, we have an event loop. An event loop of tasks. The event loop's job is to call tasks every time they are ready and coordinate all that effort into one single working machine.

The IO part of the event loop is built upon a single crucial function called select. Select is a blocking function, implemented by the operating system underneath, that allows waiting on sockets for incoming or outgoing data. Upon data being received it wakes up, and returns the sockets which received data, or the sockets whom are ready for writing.

When you try to receive or send data over a socket through asyncio, what actually happens below is that the socket is first checked if it has any data that can be immediately read or sent. If its .send() buffer is full, or the .recv() buffer is empty, the socket is registered to the select function (by simply adding it to one of the lists, rlist for recv and wlist for send) and the appropriate function awaits a newly created future object, tied to that socket.

When all available tasks are waiting for futures, the event loop calls select and waits. When the one of the sockets has incoming data, or its send buffer drained up, asyncio checks for the future object tied to that socket, and sets it to done.

Now all the magic happens. The future is set to done, the task that added itself before with add_done_callback() rises up back to life, and calls .send() on the coroutine which resumes the inner-most coroutine (because of the await chain) and you read the newly received data from a nearby buffer it was spilled unto.

Method chain again, in case of recv():

  1. select.select waits.
  2. A ready socket, with data is returned.
  3. Data from the socket is moved into a buffer.
  4. future.set_result() is called.
  5. Task that added itself with add_done_callback() is now woken up.
  6. Task calls .send() on the coroutine which goes all the way into the inner-most coroutine and wakes it up.
  7. Data is being read from the buffer and returned to our humble user.

In summary, asyncio uses generator capabilities, that allow pausing and resuming functions. It uses yield from capabilities that allow passing data back and forth from the inner-most generator to the outer-most. It uses all of those in order to halt function execution while it's waiting for IO to complete (by using the OS select function).

And the best of all? While one function is paused, another may run and interleave with the delicate fabric, which is asyncio.