Python timeout context manager with threads

San4ez picture San4ez · Feb 22, 2013 · Viewed 9.2k times · Source

I have timeout context manager that works perfectly with signals but it raises error in multithread mode because signals work only in main thread.

def timeout_handler(signum, frame):
    raise TimeoutException()

@contextmanager
def timeout(seconds):
    old_handler = signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)
        signal.signal(signal.SIGALRM, old_handler)

I've seen decorator implementation of timeout but I don't know how to pass yield inside class derived from threading.Thread. My variant won't work.

@contextmanager
def timelimit(seconds):
    class FuncThread(threading.Thread):
        def run(self):
            yield

    it = FuncThread()        
    it.start()
    it.join(seconds)

    if it.isAlive():
        raise TimeoutException()

Answer

Mr Fooz picture Mr Fooz · Mar 3, 2013

If the code guarded by the context manager is loop-based, consider handling this the way people handle thread killing. Killing another thread is generally unsafe, so the standard approach is to have the controlling thread set a flag that's visible to the worker thread. The worker thread periodically checks that flag and cleanly shuts itself down. Here's how you can do something analogous with timeouts:

class timeout(object):
    def __init__(self, seconds):
        self.seconds = seconds
    def __enter__(self):
        self.die_after = time.time() + self.seconds
        return self
    def __exit__(self, type, value, traceback):
        pass
    @property
    def timed_out(self):
        return time.time() > self.die_after

Here's a single-threaded usage example:

with timeout(1) as t:
    while True: # this will take a long time without a timeout
        # periodically check for timeouts
        if t.timed_out:
            break # or raise an exception
        # do some "useful" work
        print "."
        time.sleep(0.2)

and a multithreaded one:

import thread
def print_for_n_secs(string, seconds):
    with timeout(seconds) as t:
        while True:
            if t.timed_out:
                break # or raise an exception
            print string,
            time.sleep(0.5)

for i in xrange(5):
    thread.start_new_thread(print_for_n_secs,
                            ('thread%d' % (i,), 2))
    time.sleep(0.25)

This approach is more intrusive than using signals but it works for arbitrary threads.