RxJS Promise Composition (passing data)

low_ghost picture low_ghost · Jan 10, 2016 · Viewed 36.6k times · Source

I'm brand new to Rx and am finding it difficult to find documentation on composing promises such that data from the first promise is passed into the second and so on. Here's three very basic promises, the calculations on the data aren't important, just that something async has to be done using data from the previous promise.

 const p1 = () => Promise.resolve(1);
 const p2 = x => { const val = x + 1; return Promise.resolve(val); };
 const p3 = x => {
      const isEven = x => x % 2 === 0;
      return Promise.resolve(isEven(x));
 };

The traditional way to achieve the composition I'm talking about:

 pl().then(p2).then(p3).then(console.log);

My favorite implementation is Ramda's composeP and pipeP:

R.pipeP(p1, p2, p3, console.log)()

It seems likely Rx might be able to handle this kind of situation pretty fluently. However, the closest I've found so far is from the RxJS to async (library) comparison here https://github.com/Reactive-Extensions/RxJS/blob/master/doc/mapping/async/comparing.md:

 var Rx = require('rx'),
     fs = require('fs'),
     path = require('path');
 var file = path.join(__dirname, 'file.txt'),
     dest = path.join(__dirname, 'file1.txt'),
     exists = Rx.Observable.fromCallback(fs.exists),
     rename = Rx.Observable.fromNodeCallback(fs.rename),
     stat = Rx.Observable.fromNodeCallback(fs.stat);
 exists(file)
    .concatMap(function (flag) {
     return flag ?
         rename(file, dest) :
         Rx.Observable.throw(new Error('File does not exist.'));
    })
    .concatMap(function () {
        return stat(dest);
    })
   .forEach(
      function (fsStat) {
          console.log(JSON.stringify(fsStat));
      },
      function (err) {
          console.log(err);
      }
    );

concatMap seems promising, but the above code looks pretty horrific. I was also having trouble with my example because Rx.Observable.fromPromise(p1) won't work as it expects a promise itself, not a function, and Rx.Observable.defer(p1) doesn't seem to pass parameters like the example.

Thanks!

Similar question but without data passing: Chaining promises with RxJS

Answer

user3743222 picture user3743222 · Jan 10, 2016

I did not read all of it, but if you want to achieve the same as pl().then(p2).then(p3).then(console.log);, with p being function returning promises, you could do something like (example here)

Rx.Observable.fromPromise(p1())
             .flatMap(function(p1_result){return p2(p1_result);})
             .flatMap(function(p2_result){return p3(p2_result);})

Or the more symmetric :

 var chainedPromises$ = 
     Rx.Observable.just()
             .flatMap(p1)
             .flatMap(p2)
             .flatMap(p3);

Now if you want to execute sequentially callback wrapped through fromCallback or fromNodeCallback, you could do something like :

function rename (flag){
  return flag
          ? rename(file,dest).flatMap(return Rx.Observable.just(dest))
          : Rx.Observable.throw(new Error('File does not exist.'));
}

Rx.Observable.just(file)
             .flatMap(exists)
             .flatMap(rename)
             .flatMap(stat)

The latter code is untested, so keep me updated if that works. Last comment, this should work if at each point you only have one value produced (like a promise). If you would have several files instead of one, with flatMap you might get ordering issues (if order matters to you), so in that case, you could use concatMap as a replacement.