This question is motivated by my another question: How to await in cdef?
There are tons of articles and blog posts on the web about asyncio
, but they are all very superficial. I couldn't find any information about how asyncio
is actually implemented, and what makes I/O asynchronous. I was trying to read the source code, but it's thousands of lines of not the highest grade C code, a lot of which deals with auxiliary objects, but most crucially, it is hard to connect between Python syntax and what C code it would translate into.
Asycnio's own documentation is even less helpful. There's no information there about how it works, only some guidelines about how to use it, which are also sometimes misleading / very poorly written.
I'm familiar with Go's implementation of coroutines, and was kind of hoping that Python did the same thing. If that was the case, the code I came up in the post linked above would have worked. Since it didn't, I'm now trying to figure out why. My best guess so far is as follows, please correct me where I'm wrong:
async def foo(): ...
are actually interpreted as methods of a class inheriting coroutine
.async def
is actually split into multiple methods by await
statements, where the object, on which these methods are called is able to keep track of the progress it made through the execution so far.await
statement).In other words, here's my attempt at "desugaring" of some asyncio
syntax into something more understandable:
async def coro(name):
print('before', name)
await asyncio.sleep()
print('after', name)
asyncio.gather(coro('first'), coro('second'))
# translated from async def coro(name)
class Coro(coroutine):
def before(self, name):
print('before', name)
def after(self, name):
print('after', name)
def __init__(self, name):
self.name = name
self.parts = self.before, self.after
self.pos = 0
def __call__():
self.parts[self.pos](self.name)
self.pos += 1
def done(self):
return self.pos == len(self.parts)
# translated from asyncio.gather()
class AsyncIOManager:
def gather(*coros):
while not every(c.done() for c in coros):
coro = random.choice(coros)
coro()
Should my guess prove correct: then I have a problem. How does I/O actually happen in this scenario? In a separate thread? Is the whole interpreter suspended and I/O happens outside the interpreter? What exactly is meant by I/O? If my python procedure called C open()
procedure, and it in turn sent interrupt to kernel, relinquishing control to it, how does Python interpreter know about this and is able to continue running some other code, while kernel code does the actual I/O and until it wakes up the Python procedure which sent the interrupt originally? How can Python interpreter in principle, be aware of this happening?
Before answering this question we need to understand a few base terms, skip these if you already know any of them.
Generators are objects that allow us to suspend the execution of a python function. User curated generators are implement using the keyword yield
. By creating a normal function containing the yield
keyword, we turn that function into a generator:
>>> def test():
... yield 1
... yield 2
...
>>> gen = test()
>>> next(gen)
1
>>> next(gen)
2
>>> next(gen)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
As you can see, calling next()
on the generator causes the interpreter to load test's frame, and return the yield
ed value. Calling next()
again, cause the frame to load again into the interpreter stack, and continue on yield
ing another value.
By the third time next()
is called, our generator was finished, and StopIteration
was thrown.
A less-known feature of generators, is the fact that you can communicate with them using two methods: send()
and throw()
.
>>> def test():
... val = yield 1
... print(val)
... yield 2
... yield 3
...
>>> gen = test()
>>> next(gen)
1
>>> gen.send("abc")
abc
2
>>> gen.throw(Exception())
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in test
Exception
Upon calling gen.send()
, the value is passed as a return value from the yield
keyword.
gen.throw()
on the other hand, allows throwing Exceptions inside generators, with the exception raised at the same spot yield
was called.
Returning a value from a generator, results in the value being put inside the StopIteration
exception. We can later on recover the value from the exception and use it to our need.
>>> def test():
... yield 1
... return "abc"
...
>>> gen = test()
>>> next(gen)
1
>>> try:
... next(gen)
... except StopIteration as exc:
... print(exc.value)
...
abc
yield from
Python 3.4 came with the addition of a new keyword: yield from
. What that keyword allows us to do, is pass on any next()
, send()
and throw()
into an inner-most nested generator. If the inner generator returns a value, it is also the return value of yield from
:
>>> def inner():
... inner_result = yield 2
... print('inner', inner_result)
... return 3
...
>>> def outer():
... yield 1
... val = yield from inner()
... print('outer', val)
... yield 4
...
>>> gen = outer()
>>> next(gen)
1
>>> next(gen) # Goes inside inner() automatically
2
>>> gen.send("abc")
inner abc
outer 3
4
I've written an article to further elaborate on this topic.
Upon introducing the new keyword yield from
in Python 3.4, we were now able to create generators inside generators that just like a tunnel, pass the data back and forth from the inner-most to the outer-most generators. This has spawned a new meaning for generators - coroutines.
Coroutines are functions that can be stopped and resumed while being run. In Python, they are defined using the async def
keyword. Much like generators, they too use their own form of yield from
which is await
. Before async
and await
were introduced in Python 3.5, we created coroutines in the exact same way generators were created (with yield from
instead of await
).
async def inner():
return 1
async def outer():
await inner()
Like every iterator or generator that implement the __iter__()
method, coroutines implement __await__()
which allows them to continue on every time await coro
is called.
There's a nice sequence diagram inside the Python docs that you should check out.
In asyncio, apart from coroutine functions, we have 2 important objects: tasks and futures.
Futures are objects that have the __await__()
method implemented, and their job is to hold a certain state and result. The state can be one of the following:
fut.cancel()
fut.set_result()
or by an exception set using fut.set_exception()
The result, just like you have guessed, can either be a Python object, that will be returned, or an exception which may be raised.
Another important feature of future
objects, is that they contain a method called add_done_callback()
. This method allows functions to be called as soon as the task is done - whether it raised an exception or finished.
Task objects are special futures, which wrap around coroutines, and communicate with the inner-most and outer-most coroutines. Every time a coroutine await
s a future, the future is passed all the way back to the task (just like in yield from
), and the task receives it.
Next, the task binds itself to the future. It does so by calling add_done_callback()
on the future. From now on, if the future will ever be done, by either being cancelled, passed an exception or passed a Python object as a result, the task's callback will be called, and it will rise back up to existence.
The final burning question we must answer is - how is the IO implemented?
Deep inside asyncio, we have an event loop. An event loop of tasks. The event loop's job is to call tasks every time they are ready and coordinate all that effort into one single working machine.
The IO part of the event loop is built upon a single crucial function called select
. Select is a blocking function, implemented by the operating system underneath, that allows waiting on sockets for incoming or outgoing data. Upon data being received it wakes up, and returns the sockets which received data, or the sockets whom are ready for writing.
When you try to receive or send data over a socket through asyncio, what actually happens below is that the socket is first checked if it has any data that can be immediately read or sent. If its .send()
buffer is full, or the .recv()
buffer is empty, the socket is registered to the select
function (by simply adding it to one of the lists, rlist
for recv
and wlist
for send
) and the appropriate function await
s a newly created future
object, tied to that socket.
When all available tasks are waiting for futures, the event loop calls select
and waits. When the one of the sockets has incoming data, or its send
buffer drained up, asyncio checks for the future object tied to that socket, and sets it to done.
Now all the magic happens. The future is set to done, the task that added itself before with add_done_callback()
rises up back to life, and calls .send()
on the coroutine which resumes the inner-most coroutine (because of the await
chain) and you read the newly received data from a nearby buffer it was spilled unto.
Method chain again, in case of recv()
:
select.select
waits.future.set_result()
is called.add_done_callback()
is now woken up..send()
on the coroutine which goes all the way into the inner-most coroutine and wakes it up.In summary, asyncio uses generator capabilities, that allow pausing and resuming functions. It uses yield from
capabilities that allow passing data back and forth from the inner-most generator to the outer-most. It uses all of those in order to halt function execution while it's waiting for IO to complete (by using the OS select
function).
And the best of all? While one function is paused, another may run and interleave with the delicate fabric, which is asyncio.