Proper way to shutdown asyncio tasks

Pavlos Parissis picture Pavlos Parissis · Jan 10, 2016 · Viewed 26.3k times · Source

I am writing a tool which connects to X number of UNIX sockets, sends a command and saves the output in the local file-system. It runs this every X seconds. In order to perform some cleanup when the tool receives termination signals I register a function(shutdown) to signal.SIGHUP and signal.SIGTERM signals. This function cancels all tasks and then closes the event loop.

My problem is that I get

RuntimeError: Event loop stopped before Future completed

when I send signal.SIGTERM(kill 'pid'). I have read the documentation about canceling tasks twice but I haven't spot what I am doing wrong here.

I also noticed something strange, when I send the termination signal the program is in sleep mode and I see in the log that it wakes up the pull_stats() coroutine, you can see this in th first 2 lines of the log.

Log:

21:53:44,194 [23031] [MainThread:supervisor  ] DEBUG    **sleeping for 9.805s secs**
21:53:45,857 [23031] [MainThread:pull_stats  ] INFO     pull statistics
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,859 [23031] [MainThread:get         ] DEBUG    connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     received stop signal, cancelling tasks...
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,859 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,860 [23031] [MainThread:shutdown    ] INFO     True
21:53:45,860 [23031] [MainThread:shutdown    ] INFO     stopping event loop
21:53:45,860 [23031] [MainThread:shutdown    ] INFO     bye, exiting...
Traceback (most recent call last):
  File "./pull.py", line 249, in <module>
    main()
  File "./pull.py", line 245, in main
    supervisor(loop, config)
  File "./pull.py", line 161, in supervisor
    config['pull']['socket-dir'], storage_dir, loop))
  File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete
    raise RuntimeError('Event loop stopped before Future completed.')
RuntimeError: Event loop stopped before Future completed.

Here is the code:

def shutdown(loop):
    LOGGER.info('received stop signal, cancelling tasks...')
    for task in asyncio.Task.all_tasks():
        LOGGER.info(task.cancel())
    LOGGER.info('stopping event loop')
    loop.stop()
    LOGGER.info('bye, exiting...')


def write_file(filename, data):
    try:
        with open(filename, 'w') as file_handle:
            file_handle.write(data.decode())
    except OSError as exc:
        return False
    else:
        return True


@asyncio.coroutine
def get(socket_file, cmd, storage_dir, loop):
    connect = asyncio.open_unix_connection(socket_file)
    reader, writer = yield from asyncio.wait_for(connect, 1)

    writer.write('{c}\n'.format(c=cmd).encode())
    data = yield from reader.read()
    writer.close()

    filename = os.path.basename(socket_file) + '_' + cmd.split()[1]
    filename = os.path.join(storage_dir, filename)
    result = yield from loop.run_in_executor(None, write_file, filename, data)

    return result


@asyncio.coroutine
def pull_stats(socket_dir, storage_dir, loop):
    socket_files = glob.glob(socket_dir + '/*sock*')
    coroutines = [get(socket_file, cmd, storage_dir, loop)
                  for socket_file in socket_files
                  for cmd in CMDS]
    status = yield from asyncio.gather(*coroutines)

    if len(set(status)) == 1 and True in set(status):
        return True
    else:
        return False


def supervisor(loop, config):
    dst_dir = config.get('pull', 'dst-dir')
    tmp_dst_dir = config.get('pull', 'tmp-dst-dir')

    while True:
        start_time = int(time.time())
        storage_dir = os.path.join(tmp_dst_dir, str(start_time))

        try:
            os.makedirs(storage_dir)
        except OSError as exc:
            msg = "failed to create directory {d}:{e}".format(d=storage_dir,
                                                              e=exc)
            LOGGER.critical(msg)

        # Launch all connections.
        result = loop.run_until_complete(pull_stats(
            config['pull']['socket-dir'], storage_dir, loop))

        if result:
            try:
                shutil.move(storage_dir, dst_dir)
            except OSError as exc:
                LOGGER.critical("failed to move %s to %s: %s", storage_dir,
                                dst_dir, exc)
                break
            else:
                LOGGER.info('statistics are saved in %s', os.path.join(
                    dst_dir, os.path.basename(storage_dir)))
        else:
            LOGGER.critical('failed to pull stats')
            shutil.rmtree(storage_dir)

        sleep = config.getint('pull', 'pull-interval') - (time.time() -
                                                          start_time)
        if 0 < sleep < config.getint('pull', 'pull-interval'):
            time.sleep(sleep)
    loop.close()
    sys.exit(1)


def main():
    args = docopt(__doc__, version=VERSION)
    config = ConfigParser(interpolation=ExtendedInterpolation())
    config.read_dict(copy.copy(DEFAULT_OPTIONS))
    config.read(args['--file'])

    loop = asyncio.get_event_loop()

    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))

    num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None)
    LOGGER.setLevel(num_level)

    supervisor(loop, config)

# This is the standard boilerplate that calls the main() function.
if __name__ == '__main__':
    main()

Answer

kwarunek picture kwarunek · Jan 11, 2016

The cancellation is not immediate and requires running ioloop to be resolved with exception CancelledError. Remove ioloop.stop from shutdown and handle exception in supervisor, to make things work. Below simplified example.

Important is, however you can cancel Task, it only stops watching/waiting for end/results and loop won't handle further events for it. But the underneath request/pipe will not be stopped.

Simplified example:

import asyncio
import functools
import logging
import signal
import sys
from concurrent.futures import CancelledError


def shutdown(loop):
    logging.info('received stop signal, cancelling tasks...')
    for task in asyncio.Task.all_tasks():
        task.cancel()
    logging.info('bye, exiting in a minute...')    


@asyncio.coroutine
def get(i):
    logging.info('sleep for %d', i)
    yield from asyncio.sleep(i)    


@asyncio.coroutine
def pull_stats():
    coroutines = [get(i) for i in range(10,20)]
    status = yield from asyncio.gather(*coroutines)


def supervisor(loop):
    try:
        while True:
            result = loop.run_until_complete(pull_stats())
    except CancelledError:
        logging.info('CancelledError')
    loop.close()
    sys.exit(1)


def main():
    logging.getLogger().setLevel(logging.INFO)
    loop = asyncio.get_event_loop()
    loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))
    loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))
    supervisor(loop)


if __name__ == '__main__':
    main()

Note, that if you cancel only gather's Future, all children will be set as cancelled as well.

And the sleep thing

Any receipt of a signal or interrupt causes the program to resume execution. So when the process receive SIGTERM and handler is set, python allows you to handle it, to do this thread is resumed and sighandler is called. Due to implementation of ioloop and its signal handling, it keeps running after wake.