I am writing a tool which connects to X number of UNIX sockets, sends a command and saves the output in the local file-system. It runs this every X seconds. In order to perform some cleanup when the tool receives termination signals I register a function(shutdown) to signal.SIGHUP and signal.SIGTERM signals. This function cancels all tasks and then closes the event loop.
My problem is that I get
RuntimeError: Event loop stopped before Future completed
when I send signal.SIGTERM(kill 'pid'). I have read the documentation about canceling tasks twice but I haven't spot what I am doing wrong here.
I also noticed something strange, when I send the termination signal the program is in sleep mode and I see in the log that it wakes up the pull_stats() coroutine, you can see this in th first 2 lines of the log.
Log:
21:53:44,194 [23031] [MainThread:supervisor ] DEBUG **sleeping for 9.805s secs**
21:53:45,857 [23031] [MainThread:pull_stats ] INFO pull statistics
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin3.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin2.sock
21:53:45,858 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin4.sock
21:53:45,859 [23031] [MainThread:get ] DEBUG connecting to UNIX socket /run/haproxy/admin1.sock
21:53:45,859 [23031] [MainThread:shutdown ] INFO received stop signal, cancelling tasks...
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,859 [23031] [MainThread:shutdown ] INFO True
21:53:45,860 [23031] [MainThread:shutdown ] INFO True
21:53:45,860 [23031] [MainThread:shutdown ] INFO stopping event loop
21:53:45,860 [23031] [MainThread:shutdown ] INFO bye, exiting...
Traceback (most recent call last):
File "./pull.py", line 249, in <module>
main()
File "./pull.py", line 245, in main
supervisor(loop, config)
File "./pull.py", line 161, in supervisor
config['pull']['socket-dir'], storage_dir, loop))
File "/usr/lib/python3.4/asyncio/base_events.py", line 274, in run_until_complete
raise RuntimeError('Event loop stopped before Future completed.')
RuntimeError: Event loop stopped before Future completed.
Here is the code:
def shutdown(loop):
LOGGER.info('received stop signal, cancelling tasks...')
for task in asyncio.Task.all_tasks():
LOGGER.info(task.cancel())
LOGGER.info('stopping event loop')
loop.stop()
LOGGER.info('bye, exiting...')
def write_file(filename, data):
try:
with open(filename, 'w') as file_handle:
file_handle.write(data.decode())
except OSError as exc:
return False
else:
return True
@asyncio.coroutine
def get(socket_file, cmd, storage_dir, loop):
connect = asyncio.open_unix_connection(socket_file)
reader, writer = yield from asyncio.wait_for(connect, 1)
writer.write('{c}\n'.format(c=cmd).encode())
data = yield from reader.read()
writer.close()
filename = os.path.basename(socket_file) + '_' + cmd.split()[1]
filename = os.path.join(storage_dir, filename)
result = yield from loop.run_in_executor(None, write_file, filename, data)
return result
@asyncio.coroutine
def pull_stats(socket_dir, storage_dir, loop):
socket_files = glob.glob(socket_dir + '/*sock*')
coroutines = [get(socket_file, cmd, storage_dir, loop)
for socket_file in socket_files
for cmd in CMDS]
status = yield from asyncio.gather(*coroutines)
if len(set(status)) == 1 and True in set(status):
return True
else:
return False
def supervisor(loop, config):
dst_dir = config.get('pull', 'dst-dir')
tmp_dst_dir = config.get('pull', 'tmp-dst-dir')
while True:
start_time = int(time.time())
storage_dir = os.path.join(tmp_dst_dir, str(start_time))
try:
os.makedirs(storage_dir)
except OSError as exc:
msg = "failed to create directory {d}:{e}".format(d=storage_dir,
e=exc)
LOGGER.critical(msg)
# Launch all connections.
result = loop.run_until_complete(pull_stats(
config['pull']['socket-dir'], storage_dir, loop))
if result:
try:
shutil.move(storage_dir, dst_dir)
except OSError as exc:
LOGGER.critical("failed to move %s to %s: %s", storage_dir,
dst_dir, exc)
break
else:
LOGGER.info('statistics are saved in %s', os.path.join(
dst_dir, os.path.basename(storage_dir)))
else:
LOGGER.critical('failed to pull stats')
shutil.rmtree(storage_dir)
sleep = config.getint('pull', 'pull-interval') - (time.time() -
start_time)
if 0 < sleep < config.getint('pull', 'pull-interval'):
time.sleep(sleep)
loop.close()
sys.exit(1)
def main():
args = docopt(__doc__, version=VERSION)
config = ConfigParser(interpolation=ExtendedInterpolation())
config.read_dict(copy.copy(DEFAULT_OPTIONS))
config.read(args['--file'])
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))
loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))
num_level = getattr(logging, config.get('pull', 'loglevel').upper(), None)
LOGGER.setLevel(num_level)
supervisor(loop, config)
# This is the standard boilerplate that calls the main() function.
if __name__ == '__main__':
main()
The cancellation is not immediate and requires running ioloop to be resolved with exception CancelledError
. Remove ioloop.stop
from shutdown and handle exception in supervisor, to make things work. Below simplified example.
Important is, however you can cancel Task
, it only stops watching/waiting for end/results and loop won't handle further events for it. But the underneath request/pipe will not be stopped.
Simplified example:
import asyncio
import functools
import logging
import signal
import sys
from concurrent.futures import CancelledError
def shutdown(loop):
logging.info('received stop signal, cancelling tasks...')
for task in asyncio.Task.all_tasks():
task.cancel()
logging.info('bye, exiting in a minute...')
@asyncio.coroutine
def get(i):
logging.info('sleep for %d', i)
yield from asyncio.sleep(i)
@asyncio.coroutine
def pull_stats():
coroutines = [get(i) for i in range(10,20)]
status = yield from asyncio.gather(*coroutines)
def supervisor(loop):
try:
while True:
result = loop.run_until_complete(pull_stats())
except CancelledError:
logging.info('CancelledError')
loop.close()
sys.exit(1)
def main():
logging.getLogger().setLevel(logging.INFO)
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGHUP, functools.partial(shutdown, loop))
loop.add_signal_handler(signal.SIGTERM, functools.partial(shutdown, loop))
supervisor(loop)
if __name__ == '__main__':
main()
Note, that if you cancel only gather's
Future, all children will be set as cancelled as well.
And the sleep thing
Any receipt of a signal or interrupt causes the program to resume execution. So when the process receive SIGTERM and handler is set, python allows you to handle it, to do this thread is resumed and sighandler is called. Due to implementation of ioloop and its signal handling, it keeps running after wake.