await for any future asyncio

Mirac7 picture Mirac7 · Sep 9, 2016 · Viewed 7.4k times · Source

I'm trying to use asyncio to handle concurrent network I/O. A very large number of functions are to be scheduled at a single point which vary greatly in time it takes for each to complete. Received data is then processed in a separate process for each output.

The order in which the data is processed is not relevant, so given the potentially very long waiting period for output I'd like to await for whatever future finishes first instead of a predefined order.

def fetch(x):
    sleep()

async def main():
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
    for f in futures:
       await f

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

Normally, awaiting in order in which futures were queued is fine:

Well behaved functions profiler graph

Blue color represents time each task is in executor's queue, i.e. run_in_executor has been called, but the function was not yet executed, as the executor runs only 5 tasks simultaneously; green is time spent on executing the function itself; and the red is the time spent waiting for all previous futures to await.

Volatile functions profiler graph

In my case where functions vary in time greatly, there is a lot of time lost on waiting for previous futures in queue to await, while I could be locally processing GET output. This makes my system idle for a while only to get overwhelmed when several outputs complete simultaneously, then jumping back to idle waiting for more requests to finish.

Is there a way to await whatever future is first completed in the executor?

Answer

Andrew Svetlov picture Andrew Svetlov · Sep 9, 2016

Looks like you are looking for asyncio.wait with return_when=asyncio.FIRST_COMPLETED.

def fetch(x):
    sleep()

async def main():
    futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
    while futures:
        done, futures = await asyncio.wait(futures, 
            loop=loop, return_when=asyncio.FIRST_COMPLETED)  
        for f in done:
            await f

loop = asyncio.get_event_loop()
loop.run_until_complete(main())