Combine a list of Observables and wait until all completed

Craig Russell picture Craig Russell · Feb 12, 2016 · Viewed 76.4k times · Source

TL;DR How to convert Task.whenAll(List<Task>) into RxJava?

My existing code uses Bolts to build up a list of asynchronous tasks and waits until all of those tasks finish before performing other steps. Essentially, it builds up a List<Task> and returns a single Task which is marked as completed when all tasks in the list complete, as per the example on the Bolts site.

I'm looking to replace Bolts with RxJava and I'm assuming this method of building up a list of async tasks (size not known in advance) and wrapping them all into a single Observable is possible, but I don't know how.

I've tried looking at merge, zip, concat etc... but can't get to work on the List<Observable> that I'd be building up as they all seem geared to working on just two Observables at a time if I understand the docs correctly.

I'm trying to learn RxJava and am still very new to it so forgive me if this is an obvious question or explained in the docs somewhere; I have tried searching. Any help would be much appreciated.

Answer

MyDogTom picture MyDogTom · Feb 12, 2016

You can use flatMap in case you have dynamic tasks composition. Something like this:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) {
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplemete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);
}

Another good example of parallel execution

Note: I do not really know your requirements for error handling. For example what to do if only one task fails. I think you should verify this scenario.