I have a problem when using CompletableFuture in java. I have 2 select requests those are filled when receiving responses from server.
In the connection thread (THREAD-1) (use reactor), I use:
if(hasException) {
selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
System.out.println("Before complete future");
selectFuture.complete(result);
System.out.println("After complete future");
}
And in other thread (THREAD-2), I use:
CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
System.out.println("Receive all future");
// Do sth here
});
My situation is that the system print out "Receive all future" but THREAD-1 is blocked when calling future.complete(result);
It cannot get out of that command.
If in THREAD-2, I use CompletableFuture.allOf(allOfSelect).get()
, the THREAD-1 will run correctly. But using CompletableFuture.get()
reduces performance, so I would like to use CompletableFuture.whenComplete()
.
Anyone can help me explain the cause of blocking?
Thanks!
The complete
call triggers all dependent CompletionStage
s.
So if you've previously registered a BiConsumer
with whenComplete
, the complete
will invoke it in its calling thread. In your case, the call to complete
will return when the BiConsumer
you've passed to whenComplete
finishes. This is described in the the class javadoc
Actions supplied for dependent completions of non-async methods may be performed by the thread that completes the current
CompletableFuture
, or by any other caller of a completion method.
(by another caller is the opposite situation, where the thread calling whenComplete
would actually apply the BiConsumer
if the target CompletableFuture
had already been completed.)
Here's a small program to illustrate the behavior:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
future.whenComplete((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
}
This will print
Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done
showing that the BiConsumer
was applied in the main thread, the one that called complete
.
You can use whenCompleteAsync
to force execution of the BiConsumer
in a separate thread.
[...] that executes the given action using this stage's default asynchronous execution facility when this stage completes.
For example,
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
done.get();
}
will print
Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
showing that the BiConsumer
was applied in a separate thread.