I have some CompletableFuture
s and I want to run them in parallel, waiting for the first that returns normally.
I know I can use CompletableFuture.anyOf
to wait for the first to return, but this will return normally or exceptionally. I want to ignore exceptions.
List<CompletableFuture<?>> futures = names.stream().map(
(String name) ->
CompletableFuture.supplyAsync(
() ->
// this calling may throw exceptions.
new Task(name).run()
)
).collect(Collectors.toList());
//FIXME Can not ignore exceptionally returned takes.
Future any = CompletableFuture.anyOf(futures.toArray(new CompletableFuture<?>[]{}));
try {
logger.info(any.get().toString());
} catch (Exception e) {
e.printStackTrace();
}
You may use the following helper method:
public static <T>
CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {
CompletableFuture<T> f=new CompletableFuture<>();
Consumer<T> complete=f::complete;
l.forEach(s -> s.thenAccept(complete));
return f;
}
which you can use like this, to demonstrate that it will ignore earlier exceptions but return the first provided value:
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(
() -> { throw new RuntimeException("failing immediately"); }
),
CompletableFuture.supplyAsync(
() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
return "with 5s delay";
}),
CompletableFuture.supplyAsync(
() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(10));
return "with 10s delay";
})
);
CompletableFuture<String> c = anyOf(futures);
logger.info(c.join());
One disadvantage of this solution is that it will never complete if all futures complete exceptionally. A solution, that will provide the first value if there is a successful computation but fail exceptionally if there is no successful computation at all, is a bit more involved:
public static <T>
CompletableFuture<T> anyOf(List<? extends CompletionStage<? extends T>> l) {
CompletableFuture<T> f=new CompletableFuture<>();
Consumer<T> complete=f::complete;
CompletableFuture.allOf(
l.stream().map(s -> s.thenAccept(complete)).toArray(CompletableFuture<?>[]::new)
).exceptionally(ex -> { f.completeExceptionally(ex); return null; });
return f;
}
It utilizes the fact that allOf
’s exceptionally handler is only invoked after all futures have completed (exceptionally or not) and that a future can be completed only once (letting special things like obtrude…
aside). When the exceptionally handler is executed, any attempt to complete the future with a result has been done, if there was one, so the attempt to complete it exceptionally only succeeds, if there was no previous successful completion.
It can be used exactly the same way as the first solution and only exhibit different behavior if all computations fail, e.g.:
List<CompletableFuture<String>> futures = Arrays.asList(
CompletableFuture.supplyAsync(
() -> { throw new RuntimeException("failing immediately"); }
),
CompletableFuture.supplyAsync(
// delayed to demonstrate that the solution will wait for all completions
// to ensure it doesn't miss a possible successful computation
() -> { LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(5));
throw new RuntimeException("failing later"); }
)
);
CompletableFuture<String> c = anyOf(futures);
try { logger.info(c.join()); }
catch(CompletionException ex) { logger.severe(ex.toString()); }
The example above uses a delay demonstrating that the solution will wait for all completions when there is no success, whereas this example on ideone will demonstrate how a later success will turn the result into success. Note that due to Ideones caching of results you might not notice the delay.
Note that in the case that all futures fail, there is no guaranty about which of the exceptions will get reported. Since it waits for all completions in the erroneous case, any could make it to the final result.