From concurrent.futures to asyncio

guettli picture guettli · Jul 29, 2016 · Viewed 7.4k times · Source

I have two problems with concurrent.futures:

How to break time.sleep() in a python concurrent.futures?

Conclusion: time.sleep() cannot be interrupted. One solution is: You can write a loop around it and do short sleeps.

See How to break time.sleep() in a python concurrent.futures

Individual timeouts for concurrent.futures?

Conclusion: individual timeouts need to implemented by the user. For example: for each timeout you can call to wait().

See Individual timeouts for concurrent.futures

Question

Does asyncio solve theses problems?

Answer

deceze picture deceze · Jul 29, 2016

In the asyncio model, execution is scheduled and coordinated by an event loop. To cancel execution of a currently suspended task, you essentially simply have to not resume it. While this works a little different in practice, it should be obvious that this makes cancelling a suspended task simple in theory.

Individual timeouts are certainly possible the same way: whenever you suspend a coroutine to wait for a result, you want to supply a timeout value. The event loop will ensure to cancel the waiting task when that timeout is reached and the task hasn't completed yet.

Some concrete samples:

>>> import asyncio
>>> loop = asyncio.get_event_loop()
>>> task = asyncio.ensure_future(asyncio.sleep(5))
>>> task.cancel()
>>> loop.run_until_complete(task)
Traceback (most recent call last):
   ...
concurrent.futures._base.CancelledError

In practice, this might be implemented using something like this:

class Foo:
    task = None

    async def sleeper(self):
        self.task = asyncio.sleep(60)
        try:
            await self.task
        except concurrent.futures.CancelledError:
            raise NotImplementedError

While this method is asleep, somebody else can call foo.task.cancel() to wake up the coroutine and let it handle the cancellation. Alternatively whoever calls sleeper() can cancel it directly without giving it a chance to clean up.

Setting timeouts is similarly easy:

>>> loop.run_until_complete(asyncio.wait_for(asyncio.sleep(60), 5))
[ ... 5 seconds later ... ]
Traceback (most recent call last):
   ...
concurrent.futures._base.TimeoutError

Particularly in the context of HTTP request timeouts, see aiohttp:

async def fetch_page(session, url):
    with aiohttp.Timeout(10):
        async with session.get(url) as response:
            assert response.status == 200
            return await response.read()

with aiohttp.ClientSession(loop=loop) as session:
    content = loop.run_until_complete(fetch_page(session, 'http://python.org'))

Obviously each call to fetch_page can decide on its own aiohttp.Timeout value, and each individual instance will throw its own exception when that timeout is reached.