I don't understand the purpose of mergeMap
at all. I have heard two "explanations:
merge
and map
" - nope (or I can't replicate this).Consider the following code:
var obs1 = new Rx.Observable.interval(1000);
var obs2 = new Rx.Observable.interval(1000);
//Just a merge and a map, works fine
obs1.merge(obs2).map(x=> x+'a').subscribe(
next => console.log(next)
)
//Who know what - seems to do the same thing as a plain map on 1 observable
obs1.mergeMap(val => Rx.Observable.of(val + `B`))
.subscribe(
next => console.log(next)
)
The last piece labelled "Who knows what" does nothing more than a map on obs1
- what's the point?
What does mergeMap
actually do? What is an example of a valid use case? (Preferably with some code)
Articles that didn't help me at all (mergeMap code from above is from one of these): 1, 2
tl;dr; mergeMap
is way more powerful than map
. Understanding mergeMap
is the necessary condition to access full power of Rx.
both mergeMap
and map
acts on a single stream (vs. zip
, combineLatest
)
both mergeMap
and map
can transform elements of a stream (vs. filter
, delay
)
cannot change size of the source stream (assumption: map
itself does not throw
); for each element from source exactly one mapped
element is emitted; map
cannot ignore elements (like for example filter
);
in case of the default scheduler the transformation happens synchronously; to be 100% clear: the source stream may deliver its elements asynchronously, but each next element is immediately mapped
and re-emitted further; map
cannot shift elements in time like for example delay
no restrictions on return values
id
: x => x
can change size of the source stream; for each element there might be arbitrary number (0, 1 or many) of new elements created/emitted
it offers full control over asynchronicity - both when new elements are created/emitted and how many elements from the source stream should be processed concurrently; for example assume source stream emitted 10 elements but maxConcurrency
is set to 2 then two first elements will be processed immediately and the rest 8 buffered; once one of the processed complete
d the next element from source stream will be processed and so on - it is bit tricky, but take a look at the example below
all other operators can be implemented with just mergeMap
and Observable
constructor
may be used for recursive async operations
return values has to be of Observable type (or Rx has to know how to create observable out of it - e.g. promise, array)
id
: x => Rx.Observable.of(x)
let array = [1,2,3]
fn map mergeMap
x => x*x [1,4,9] error /*expects array as return value*/
x => [x,x*x] [[1,1],[2,4],[3,9]] [1,1,2,4,3,9]
The analogy does not show full picture and it basically corresponds to .mergeMap
with maxConcurrency
set to 1. In such a case elements will be ordered as above, but in general case it does not have to be so. The only guarantee we have is that emission of new elements will be order by their position in the underlying stream. For example: [3,1,2,4,9,1]
and [2,3,1,1,9,4]
are valid, but [1,1,4,2,3,9]
is not (since 4
was emitted after 2
in the underlying stream).
mergeMap
:// implement .map with .mergeMap
Rx.Observable.prototype.mapWithMergeMap = function(mapFn) {
return this.mergeMap(x => Rx.Observable.of(mapFn(x)));
}
Rx.Observable.range(1, 3)
.mapWithMergeMap(x => x * x)
.subscribe(x => console.log('mapWithMergeMap', x))
// implement .filter with .mergeMap
Rx.Observable.prototype.filterWithMergeMap = function(filterFn) {
return this.mergeMap(x =>
filterFn(x) ?
Rx.Observable.of(x) :
Rx.Observable.empty()); // return no element
}
Rx.Observable.range(1, 3)
.filterWithMergeMap(x => x === 3)
.subscribe(x => console.log('filterWithMergeMap', x))
// implement .delay with .mergeMap
Rx.Observable.prototype.delayWithMergeMap = function(delayMs) {
return this.mergeMap(x =>
Rx.Observable.create(obs => {
// setTimeout is naive - one should use scheduler instead
const token = setTimeout(() => {
obs.next(x);
obs.complete();
}, delayMs)
return () => clearTimeout(token);
}))
}
Rx.Observable.range(1, 3)
.delayWithMergeMap(500)
.take(2)
.subscribe(x => console.log('delayWithMergeMap', x))
// recursive count
const count = (from, to, interval) => {
if (from > to) return Rx.Observable.empty();
return Rx.Observable.timer(interval)
.mergeMap(() =>
count(from + 1, to, interval)
.startWith(from))
}
count(1, 3, 1000).subscribe(x => console.log('count', x))
// just an example of bit different implementation with no returns
const countMoreRxWay = (from, to, interval) =>
Rx.Observable.if(
() => from > to,
Rx.Observable.empty(),
Rx.Observable.timer(interval)
.mergeMap(() => countMoreRxWay(from + 1, to, interval)
.startWith(from)))
const maxConcurrencyExample = () =>
Rx.Observable.range(1,7)
.do(x => console.log('emitted', x))
.mergeMap(x => Rx.Observable.timer(1000).mapTo(x), 2)
.do(x => console.log('processed', x))
.subscribe()
setTimeout(maxConcurrencyExample, 3100)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.1.1/Rx.min.js"></script>