How to use the 'flatmap' operator conditionally ? (Angular 2/rxjs)

Joel Raju picture Joel Raju · Mar 9, 2017 · Viewed 9.2k times · Source

What I'm trying to achieve is to run a series of observables conditionally.

return observable.map(response => response)
       .flatmap(response1 => observable1(response1))
       .flatmap(response2 => observable2(response2))
       .flatmap(response3 => observable3(response3))

I need to check the response1 and invoke the remaining observables if needed, else I need to return response1 and break the execution and so on.

I've gone through the following SO questions but they doesn't seem to answer my question

Conditionally choose observable in RxJS

RXJS if with observable as conditional

Handle Error in RxJs flatMap stream and continue processing

I'm new to rxjs, so forgive me if the question seems too lame.

Any help will be appreciated.

Thanks

Answer

martin picture martin · Mar 9, 2017

This is like calling multiple consecutive HTTP requests where results depend on each other. This is where you want to use concatMap() instead because it'll wait until the Observable returned from the callback function completes.

It's hard to tell what exactly you want to do from your description but if you need to stop propagating the results (and avoid calling unnecessary HTTP requests) have a look takeWhile() operator or simple filter() operators.

With filter():

return observable
    .concatMap(response => response)
    .filter(response1 => response1.whatever === true)
    .concatMap(response1 => observable1(response1))
    .filter(response2 => response2.whatever === true)
    .concatMap(response2 => observable2(response2))
    .filter(response3 => response3.whatever === true)

This simply won't propagate the item further if it fails the filter test. However the source observable can still emit values that'll go through the chain. In other words the filter operator doesn't complete the chain.

With takeWhile():

return observable
    .concatMap(response => response)
    .takeWhile(response1 => response1.whatever === true)
    .concatMap(response1 => observable1(response1))
    .takeWhile(response2 => response2.whatever === true)
    .concatMap(response2 => observable2(response2))
    .takeWhile(response3 => response3.whatever === true)

If any of the takeWhile() result into false it'll complete the chain and no other value can be emitted.