I'm trying to use asyncio to handle concurrent network I/O. A very large number of functions are to be scheduled at a single point which vary greatly in time it takes for each to complete. Received data is then processed in a separate process for each output.
The order in which the data is processed is not relevant, so given the potentially very long waiting period for output I'd like to await
for whatever future finishes first instead of a predefined order.
def fetch(x):
sleep()
async def main():
futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
for f in futures:
await f
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
Normally, awaiting in order in which futures were queued is fine:
Blue color represents time each task is in executor's queue, i.e. run_in_executor
has been called, but the function was not yet executed, as the executor runs only 5 tasks simultaneously; green is time spent on executing the function itself; and the red is the time spent waiting for all previous futures to await
.
In my case where functions vary in time greatly, there is a lot of time lost on waiting for previous futures in queue to await, while I could be locally processing GET output. This makes my system idle for a while only to get overwhelmed when several outputs complete simultaneously, then jumping back to idle waiting for more requests to finish.
Is there a way to await
whatever future is first completed in the executor?
Looks like you are looking for asyncio.wait with return_when=asyncio.FIRST_COMPLETED
.
def fetch(x):
sleep()
async def main():
futures = [loop.run_in_executor(None, fetch, x) for x in range(50)]
while futures:
done, futures = await asyncio.wait(futures,
loop=loop, return_when=asyncio.FIRST_COMPLETED)
for f in done:
await f
loop = asyncio.get_event_loop()
loop.run_until_complete(main())