Python: concurrent.futures How to make it cancelable?

Ketzu picture Ketzu · Mar 14, 2017 · Viewed 12.2k times · Source

Python concurrent.futures and ProcessPoolExecutor provide a neat interface to schedule and monitor tasks. Futures even provide a .cancel() method:

cancel(): Attempt to cancel the call. If the call is currently being executed and cannot be cancelled then the method will return False, otherwise the call will be cancelled and the method will return True.

Unfortunately in a simmilar question (concerning asyncio) the answer claims running tasks are uncancelable using this snipped of the documentation, but the docs dont say that, only if they are running AND uncancelable.

Submitting multiprocessing.Events to the processes is also not trivially possible (doing so via parameters as in multiprocess.Process returns a RuntimeError)

What am I trying to do? I would like to partition a search space and run a task for every partition. But it is enough to have ONE solution and the process is CPU intensive. So is there an actual comfortable way to accomplish this that does not offset the gains by using ProcessPool to begin with?

Example:

from concurrent.futures import ProcessPoolExecutor, FIRST_COMPLETED, wait

# function that profits from partitioned search space
def m_run(partition):
    for elem in partition:
        if elem == 135135515:
            return elem
    return False

futures = []
# used to create the partitions
steps = 100000000
with ProcessPoolExecutor(max_workers=4) as pool:
    for i in range(4):
        # run 4 tasks with a partition, but only *one* solution is needed
        partition = range(i*steps,(i+1)*steps)
        futures.append(pool.submit(m_run, partition))

    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    for d in done:
        print(d.result())

    print("---")
    for d in not_done:
        # will return false for Cancel and Result for all futures
        print("Cancel: "+str(d.cancel()))
        print("Result: "+str(d.result()))

Answer

noxdafox picture noxdafox · Aug 8, 2017

Unfortunately, running Futures cannot be cancelled. I believe the core reason is to ensure the same API over different implementations (it's not possible to interrupt running threads or coroutines).

The Pebble library was designed to overcome this and other limitations.

from pebble import ProcessPool

def function(foo, bar=0):
    return foo + bar

with ProcessPool() as pool:
    future = pool.schedule(function, args=[1])

    # if running, the container process will be terminated 
    # a new process will be started consuming the next task
    future.cancel()