How to combine multiple rxjs BehaviourSubjects

cobolstinks picture cobolstinks · Mar 16, 2017 · Viewed 11.9k times · Source

I'm building out an Angular2 app, and have two BehaviourSubjects that I want to logically combine into one subscription. I'm making two http requests and want to fire an event when both of them come back. I'm looking at forkJoin vs combineLatest. It seems that combineLatest will fire when either behvaviorSubjects are updated vs forkJoin will fire only after all behavoirSubjects are updated. Is this correct? There has to be a generally accepted pattern for this isn't there?

EDIT
Here is an example of one of my behaviorSubjects my angular2 component is subscribing to:

export class CpmService {

    public cpmSubject: BehaviorSubject<Cpm[]>;

    constructor(private _http: Http) {
        this.cpmSubject = new BehaviorSubject<Cpm[]>(new Array<Cpm>());
    }

    getCpm(id: number): void {
        let params: URLSearchParams = new URLSearchParams();
        params.set('Id', id.toString());

        this._http.get('a/Url/Here', { search: params })
            .map(response => <Cpm>response.json())
            .subscribe(_cpm => {
                this.cpmSubject.subscribe(cpmList => {
                    //double check we dont already have the cpm in the observable, if we dont have it, push it and call next to propigate new cpmlist everywheres
                    if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
                        cpmList.push(_cpm);
                        this.cpmSubject.next(cpmList);
                    }
                })
            });
    }
}

Here is a snippet of my component's subscription:

  this._cpmService.cpmSubject.subscribe(cpmList => {
      doSomeWork();
  });

But instead of firing doSomeWork() on the single subscription I want to only fire doSomeWork() when the cpmSubject and fooSubject fire.

Answer

olsn picture olsn · Mar 16, 2017

You could use the zip-operator, which works similar to combineLatest or forkJoin, but triggers only when both streams have emitted: http://reactivex.io/documentation/operators/zip.html

The difference between zip and combineLatest is: Zip will only trigger "in parallel", whereas combineLatest will trigger with any update and emit the latest value of each stream. So, assuming the following 2 streams:

streamA => 1--2--3
streamB => 10-20-30

with zip:

  • "1, 10"
  • "2, 20"
  • "3, 30"

with combineLatest:

  • "1, 10"
  • "2, 10"
  • "2, 20"
  • "3, 20"
  • "3, 30"

Here is also a live-example:

const a = new Rx.Subject();
const b = new Rx.Subject();

Rx.Observable.zip(a,b)
  .subscribe(x => console.log("zip: " + x.join(", ")));
Rx.Observable.combineLatest(a,b)
  .subscribe(x => console.log("combineLatest: " + x.join(", ")));

a.next(1);
b.next(10);
a.next(2);
b.next(20);
a.next(3);
b.next(30);
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>


Also another sidenote: Never ever ever subscribe inside a subscribe. Do something like this instead:

this._http.get('a/Url/Here', { search: params })
            .map(response => <Cpm>response.json())
            .withLatestFrom(this.cpmSubject)
            .subscribe([_cpm, cpmList] => {
                if (! (cpmList.filter((cpm: Cpm) => cpm.id === _cpm.id).length > 0) ) {
                    cpmList.push(_cpm);
                    this.cpmSubject.next(cpmList);
                }
            });