Java CompletableFuture.complete() block

QUAN Nguyen Hoang picture QUAN Nguyen Hoang · Nov 3, 2016 · Viewed 7.3k times · Source

I have a problem when using CompletableFuture in java. I have 2 select requests those are filled when receiving responses from server.

In the connection thread (THREAD-1) (use reactor), I use:

if(hasException) {
   selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
   System.out.println("Before complete future");
   selectFuture.complete(result);
   System.out.println("After complete future");
}

And in other thread (THREAD-2), I use:

CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
   System.out.println("Receive all future");
   // Do sth here
});

My situation is that the system print out "Receive all future" but THREAD-1 is blocked when calling future.complete(result); It cannot get out of that command. If in THREAD-2, I use CompletableFuture.allOf(allOfSelect).get(), the THREAD-1 will run correctly. But using CompletableFuture.get() reduces performance, so I would like to use CompletableFuture.whenComplete().

Anyone can help me explain the cause of blocking?

Thanks!

Answer

Sotirios Delimanolis picture Sotirios Delimanolis · Nov 3, 2016

The complete call triggers all dependent CompletionStages.

So if you've previously registered a BiConsumer with whenComplete, the complete will invoke it in its calling thread. In your case, the call to complete will return when the BiConsumer you've passed to whenComplete finishes. This is described in the the class javadoc

Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current CompletableFuture, or by any other caller of a completion method.

(by another caller is the opposite situation, where the thread calling whenComplete would actually apply the BiConsumer if the target CompletableFuture had already been completed.)

Here's a small program to illustrate the behavior:

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = new CompletableFuture<String>();
    future.whenComplete((r, t) -> {
        System.out.println("before sleep, executed in thread " + Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after sleep, executed in thread " + Thread.currentThread());
    });

    System.out.println(Thread.currentThread());
    future.complete("completed");
    System.out.println("done");
}

This will print

Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done

showing that the BiConsumer was applied in the main thread, the one that called complete.

You can use whenCompleteAsync to force execution of the BiConsumer in a separate thread.

[...] that executes the given action using this stage's default asynchronous execution facility when this stage completes.

For example,

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = new CompletableFuture<String>();
    CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
        System.out.println("before sleep, executed in thread " + Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after sleep, executed in thread " + Thread.currentThread());
    });

    System.out.println(Thread.currentThread());
    future.complete("completed");
    System.out.println("done");
    done.get();
}

will print

Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]

showing that the BiConsumer was applied in a separate thread.