How to force observables to execute in sequence?

Haoliang Yu picture Haoliang Yu · Apr 11, 2017 · Viewed 32.1k times · Source

I am moving from the Promise world to the Observable world. One thing I usually do with Promise is to chain a series of tasks and make them run in sequence. For example, I have three tasks: printLog1() to print 1 to the console, printLog23() to print 2 and 3 to the console, and printLog4() to print 4.

When I want to print 1-2-3-4, I would write a promise chain like

printLog1()
  .then(() => {
    printLog23();
  })
  .then(() => {
    printLog4();
  });

Now I want the same functionality with Observable and I can rewrite the printLog() function into an Observable like

printLog1 = Rx.Observabale.of(1).map((i) => console.log(i));
printLog23 = Rx.Observabale.of(2, 3).map((i) => console.log(i));
printLog4 = Rx.Observabale.of(4).map((i) => console.log(i));

Then I have three observables that emits different values to the console. How do I chain them so that these three observables would run in order and print 1-2-3-4?

Answer

martin picture martin · Apr 11, 2017

If you want to be sure the order of emissions is the same as the order in which you specified the source Observables you can use concat or concatMap operators.

The concat* operators subscribe to an Observable only after the previous Observable completes (it works with Promises as well, see http://reactivex.io/rxjs/class/es6/MiscJSDoc.js~ObservableInputDoc.html).

In you case it'd look like the following:

import { concat } from 'rxjs'; // Note, concat from 'rxjs', is not the same as concat from 'rxjs/operators'

concat(printLog1, printLog23, printLog4);

... or with concatMap if the request for one Promise depends on the response from the previous Promise:

printLog1.pipe(
  concatMap(response => ...),
  concatMap(response => ...),
);

... or when the order doesn't matter you can use merge that subscribes to all Observables/Promises immediately and reemits their results as they arrive:

merge(printLog1, printLog23, printLog4);

Jan 2019: Updated for RxJS 6