RXJS Wait for all observables in an array to complete (or error)

josh-sachs picture josh-sachs · Jan 19, 2017 · Viewed 62.5k times · Source

I'm pushing observables into an array like such...

var tasks$ = [];
tasks$.push(Observable.timer(1000));
tasks$.push(Observable.timer(3000));
tasks$.push(Observable.timer(10000));

I want an Observable that emits when all tasks$ have completed. Keep in mind, in practice, tasks$ doesn't have a known number of Observables.

I've tried Observable.zip(tasks$).subscribe() but this seems to fail in the event that there is only 1 task, and is leading me to believe that ZIP requires an even number of elements in order to work the way I would expect.

I've tried Observable.concat(tasks$).subscribe() but the result of the concat operator just seems to be an array of observables... e.g. basically the same as the input. You can't even call subscribe on it.

In C# this would be akin to Task.WhenAll(). In ES6 promise it would be akin to Promise.all().

I've come across a number of SO questions but they all seem to deal with waiting on a known number of streams (e.g. mapping them together).

Answer

cartant picture cartant · Jan 19, 2017

If you want to compose an observable that emits when all of the source observables complete, you can use forkJoin:

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/forkJoin';
import 'rxjs/add/operator/first';

var tasks$ = [];
tasks$.push(Observable.timer(1000).first());
tasks$.push(Observable.timer(3000).first());
tasks$.push(Observable.timer(10000).first());
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); });