I'm building out an Angular2 app, and have two BehaviourSubjects
that I want to logically combine into one subscription. I'm making two http requests and want to fire an event when both of them come back. I'm looking at forkJoin
vs combineLatest
. It seems that combineLatest will fire when either behvaviorSubjects are updated vs forkJoin will fire only after all behavoirSubjects are updated. Is this correct? There has to be a generally accepted pattern for this isn't there?
EDIT
Here is an example of one of my behaviorSubjects my angular2 component is subscribing to:
export class CpmService {
public cpmSubject: BehaviorSubject<Cpm[]>;
constructor(private _http: Http) {
this.cpmSubject = new BehaviorSubject<Cpm[]>(new Array<Cpm>());
}
getCpm(id: number): void {
let params: URLSearchParams = new URLSearchParams();
params.set('Id', id.toString());
this._http.get('a/Url/Here', { search: params })
.map(response => <Cpm>response.json())
.subscribe(_cpm => {
this.cpmSubject.subscribe(cpmList => {
//double check we dont already have the cpm in the observable, if we dont have it, push it and call next to propigate new cpmlist everywheres
if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
cpmList.push(_cpm);
this.cpmSubject.next(cpmList);
}
})
});
}
}
Here is a snippet of my component's subscription:
this._cpmService.cpmSubject.subscribe(cpmList => {
doSomeWork();
});
But instead of firing doSomeWork() on the single subscription I want to only fire doSomeWork() when the cpmSubject and fooSubject fire.
You could use the zip
-operator, which works similar to combineLatest or forkJoin, but triggers only when both streams have emitted: http://reactivex.io/documentation/operators/zip.html
The difference between zip
and combineLatest
is:
Zip will only trigger "in parallel", whereas combineLatest
will trigger with any update and emit the latest value of each stream.
So, assuming the following 2 streams:
streamA => 1--2--3
streamB => 10-20-30
with zip
:
with combineLatest
:
Here is also a live-example:
const a = new Rx.Subject();
const b = new Rx.Subject();
Rx.Observable.zip(a,b)
.subscribe(x => console.log("zip: " + x.join(", ")));
Rx.Observable.combineLatest(a,b)
.subscribe(x => console.log("combineLatest: " + x.join(", ")));
a.next(1);
b.next(10);
a.next(2);
b.next(20);
a.next(3);
b.next(30);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
Also another sidenote: Never ever ever subscribe inside a subscribe. Do something like this instead:
this._http.get('a/Url/Here', { search: params })
.map(response => <Cpm>response.json())
.withLatestFrom(this.cpmSubject)
.subscribe([_cpm, cpmList] => {
if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
cpmList.push(_cpm);
this.cpmSubject.next(cpmList);
}
});