Can I use the work-stealing behaviour of ForkJoinPool to avoid a thread starvation deadlock?

Tavian Barnes picture Tavian Barnes · Oct 26, 2014 · Viewed 7.8k times · Source

A thread starvation deadlock occurs in a normal thread pool if all the threads in the pool are waiting for queued tasks in the same pool to complete. ForkJoinPool avoids this problem by stealing work from other threads from inside the join() call, rather than simply waiting. For example:

private static class ForkableTask extends RecursiveTask<Integer> {
    private final CyclicBarrier barrier;

    ForkableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    protected Integer compute() {
        try {
            barrier.await();
            return 1;
        } catch (InterruptedException | BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }
}

@Test
public void testForkJoinPool() throws Exception {
    final int parallelism = 4;
    final ForkJoinPool pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<ForkableTask> forkableTasks = new ArrayList<>(parallelism);
    for (int i = 0; i < parallelism; ++i) {
        forkableTasks.add(new ForkableTask(barrier));
    }

    int result = pool.invoke(new RecursiveTask<Integer>() {
        @Override
        protected Integer compute() {
            for (ForkableTask task : forkableTasks) {
                task.fork();
            }

            int result = 0;
            for (ForkableTask task : forkableTasks) {
                result += task.join();
            }
            return result;
        }
    });
    assertThat(result, equalTo(parallelism));
}

But when using the ExecutorService interface to a ForkJoinPool, work-stealing doesn't seem to occur. For example:

private static class CallableTask implements Callable<Integer> {
    private final CyclicBarrier barrier;

    CallableTask(CyclicBarrier barrier) {
        this.barrier = barrier;
    }

    @Override
    public Integer call() throws Exception {
        barrier.await();
        return 1;
    }
}

@Test
public void testWorkStealing() throws Exception {
    final int parallelism = 4;
    final ExecutorService pool = new ForkJoinPool(parallelism);
    final CyclicBarrier barrier = new CyclicBarrier(parallelism);

    final List<CallableTask> callableTasks = Collections.nCopies(parallelism, new CallableTask(barrier));
    int result = pool.submit(new Callable<Integer>() {
        @Override
        public Integer call() throws Exception {
            int result = 0;
            // Deadlock in invokeAll(), rather than stealing work
            for (Future<Integer> future : pool.invokeAll(callableTasks)) {
                result += future.get();
            }
            return result;
        }
    }).get();
    assertThat(result, equalTo(parallelism));
}

From a cursory look at ForkJoinPool's implementation, all the regular ExecutorService APIs are implemented using ForkJoinTasks, so I'm not sure why a deadlock occurs.

Answer

Philipp Wendler picture Philipp Wendler · Oct 28, 2014

You are almost answering your own question. The solution is the statement that "ForkJoinPool avoids this problem by stealing work from other threads from inside the join() call". Whenever the threads are blocked for some other reason except ForkJoinPool.join(), this work stealing does not occur, and the threads just waits and does nothing.

The reason for this is that in Java it is not possible for the ForkJoinPool to prevent its threads from blocking and instead give them something else to work on. The thread itself needs to avoid blocking and instead ask the pool for work it should do. And this is only implemented in the ForkJoinTask.join() method, not in any other blocking method. If you use a Future inside a ForkJoinPool, you will also see the starvation deadlock.

Why is work stealing only implemented in ForkJoinTask.join() and not in any other blocking methods in the Java API? Well, there are many of such blocking methods (Object.wait(), Future.get(), any of the concurrency primitives in java.util.concurrent, I/O methods etc), and they have nothing to do with ForkJoinPool, which is just an arbitrary class in the API, so adding special cases to all these methods would be bad design. It would also lead to possibly very surprising and undesired effects. Imagine for example a user passing a task to an ExecutorService that waits on a Future, and then finding out that the task hangs very long in Future.get() just because the running thread stole some other (long-running) work item instead of waiting for the Future and continuing immediately after the result is available. Once a thread starts working on another task, it cannot return to the original task until the second task is finished. Thus it is actually a good thing that other blocking methods do not do work stealing. For a ForkJoinTask, this problem does not exist, because it is not important that the primary task is continued as soon as possible, it is only important that all tasks together are handled as efficiently as possible.

It is also not possible to implement your own method for doing work stealing inside a ForkJoinPool, because all the relevant parts are not public.

However, there is actually a second method how starvation deadlocks can be prevented. This is called managed blocking. It does not use work stealing (to avoid the problem mentioned above), but also needs the thread that is going to be block to actively cooperate with the thread pool. With managed blocking, the thread tells the thread pool that it may be blocked before it calls the potentially blocking method, and also informs the pool when the blocking method is finished. The thread pool then knows that there is a risk of a starvation deadlock, and may spawn additional threads if all of its threads are currently in some blocking operation and there are still other tasks to execute. Note that this is less efficient than work stealing, because of the overhead of the additional threads. If you implement a recursive parallel algorithm with ordinary futures and managed blocking instead of with ForkJoinTask and work stealing, the number of additional threads can get very large (because in the "divide" phase of the algorithm, a lot of tasks will be created and given to threads that immediately block and wait for results from sub-tasks). However, a starvation deadlock is still prevented, and it avoids the problem that a task has to wait a long time because its thread started working on another task in the mean time.

The ForkJoinPool of Java also supports managed blocking. To use this, one needs to implement the interface ForkJoinPool.ManagedBlocker such that the potentially-blocking method that the task wants to execute is called from within the block method of this interface. Then the task may not call the blocking method directly, but instead needs to call the static method ForkJoinPool.managedBlock(ManagedBlocker). This method handles the communication with the thread pool before and after the blocking. It also works if the current task is not executed within a ForkJoinPool, then it just calls the blocking method.

The only place I have found in the Java API (for Java 7) that actually uses managed blocking is the class Phaser. (This class is a synchronization barrier like mutexes and latches, but more flexible and powerful.) So synchronizing with a Phaser inside a ForkJoinPool task should use managed blocking and can avoid starvation deadlocks (but ForkJoinTask.join() is still preferable because it uses work stealing instead of managed blocking). This works regardless of whether you use the ForkJoinPool directly or via its ExecutorService interface. However, it will not work if you use any other ExecutorService like those created by the class Executors, because these do not support managed blocking.

In Scala, the use of managed blocking is more widespread (description, API).