Java collecting results of CompletableFuture from multiple calls

agienka picture agienka · Oct 30, 2017 · Viewed 7.1k times · Source

I have to run multiple external call operations and then obtain the result in a form of list. I decided to use the CompletableFuture api, and the code I prepared is pretty disgusting:

The example:

public class Main {
    public static void main(String[] args) {
        String prefix = "collection_";

        List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
                .boxed()
                .map(num -> prefix.concat("" + num))
                .map(name -> CompletableFuture.supplyAsync(
                        () -> callApi(name)))
                .collect(Collectors.toList());

        try {
            CompletableFuture.allOf(usersResult.toArray(new CompletableFuture[usersResult.size()])).get();
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }

        List<User> users = usersResult //the result I need
                .stream()
                .map(userCompletableFuture -> {
                    try {
                        return userCompletableFuture.get();
                    } catch (InterruptedException | ExecutionException e) {
                        e.printStackTrace();
                    }
                    return null;
                })
                .filter(Objects::nonNull)
                .collect(Collectors.toList());
    }

    private static User callApi(String collection) {
        return new User(); //potentially time-consuming operation
    }
}

I have the following questions:

  1. Can I somehow avoid duplicating the try-catch block in the stream, where I'm mapping CompletableFuture to User?
  2. Can this code be less sequential (how can I avoid waiting for all the futures to finish?)
  3. Is it ok, to do it this way (will all the futures be resolved in the stream?):

    public class Main {
        public static void main(String[] args) {
            String prefix = "collection_";
    
            List<User> usersResult = IntStream.range(1, 10)
                    .boxed()
                    .map(num -> prefix.concat("" + num))
                    .map(name -> CompletableFuture.supplyAsync(
                            () -> callApi(name)))
                    .filter(Objects::nonNull)
                    .map(userCompletableFuture -> {
                        try {
                            return userCompletableFuture.get();
                        } catch (InterruptedException | ExecutionException e) {
                            e.printStackTrace();
                        }
                        return null;
                    })
                    .collect(Collectors.toList());
        }
    
        private static User callApi(String collection) {
            return new User(); //potentially time-consuming operation
        }
    }
    

Answer

Didier L picture Didier L · Oct 31, 2017

For 1., you can entirely skip the allOf().get() calls since you are anyway waiting on all futures one by one.¹

For 2., you can simplify the try-catch by doing the following:

  • use exceptionally() to handle exceptions directly in the future;
  • use join() instead of get() to avoid checked exceptions (and you know no exceptions are possible).

For 3., you cannot really make it less sequential since you need at least to steps: create all futures and then process their results.

If you do everything in a single stream, it will create each future, then immediately wait on it before creating the next one – so you would lose the parallelism. You could use a parallel stream instead but then there wouldn't be much benefit of using CompletableFutures.

So the final code is:

List<CompletableFuture<User>> usersResult = IntStream.range(1, 10)
        .boxed()
        .map(num -> prefix.concat("" + num))
        .map(name -> CompletableFuture.supplyAsync(() -> callApi(name))
            .exceptionally(e -> {
                e.printStackTrace();
                return null;
            }))
        .collect(Collectors.toList());

List<User> users = usersResult
        .stream()
        .map(CompletableFuture::join)
        .filter(Objects::nonNull)
        .collect(Collectors.toList());

¹ Note that an allOf() call remains needed if you want your result to be a CompletableFuture<List<User>> as well, e.g.

final CompletableFuture<List<User>> result =
        CompletableFuture.allOf(usersResult.stream().toArray(CompletableFuture[]::new))
                .thenApply(__ -> usersResult
                        .stream()
                        .map(CompletableFuture::join)
                        .filter(Objects::nonNull)
                        .collect(Collectors.toList()));