Observable.forkJoin and array argument

Benjamin McFerren picture Benjamin McFerren · Feb 27, 2016 · Viewed 71.5k times · Source

In the Observables forkJoin documentation, it says that args can be an array but it doesn't list an example doing so:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

I have tried a function similar to what I listed (below) but came up with an error:

:3000/angular2/src/platform/browser/browser_adapter.js:76 

EXCEPTION: TypeError: Observable_1.Observable.forkJoin is not a function

A sheared version of my function below:

processStuff( inputObject ) {
  let _self = this;

  return new Observable(function(observer) {
    let observableBatch = [];

    inputObject.forEach(function(componentarray, key) {
      observableBatch.push(_self.http.get(key + '.json').map((res: Response) => res.json()));
    });

    Observable.forkJoin(
      observableBatch
    // );
    ).subscribe(() => {
      observer.next();
      observer.complete();
    });

  });
}

The root of my question is related to a loop to end before proceeding as asked here: Angular2 Observable - how to wait for all function calls in a loop to end before proceeding?

But I haven't fully mastered the correct use of forkJoin with an array and the right syntax to do so.

I am very grateful for help you could offer.

NOTE: EXAMPLE OF THIRD FUNCTION THAT RETURNS AN OBSERVABLE

thirdFunction() {
  let _self = this;

  return Observable.create((observer) => {
  // return new Observable(function(observer) {
    ...

    observer.next(responseargs);
    observer.complete();
  });
}

processStuff(inputObject) {
  let _self = this;
  let observableBatch = [];

  inputObject.forEach((componentarray, key) => {
    observableBatch.push(_self.thirdFunction().map((res: Response) => res.json()));
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff(inputObject)
    .subscribe()
}

Answer

Sasxa picture Sasxa · Feb 28, 2016

You need to import operators that are not loaded by default. That's what EXCEPTION Observable.xxxx is not a function usually means. You can either import all operators by adding complete rxjs to your bootstrap, for example:

import 'rxjs/Rx'

or by importing specific operators, in your case:

import 'rxjs/add/observable/forkJoin'

Another observation/suggestion about your code: try to stick with one syntax. You are mixing es5, es6, typescript... and while it is working it will only confuse you in the long run. Also, if you're just starting with Observables, try to avoid new Observable() and use creation operators instead;

processStuff( inputObject ) {
  let observableBatch = [];

  inputObject.forEach(( componentarray, key ) => {
    observableBatch.push( this.http.get( key + '.json').map((res: Response) => res.json()) );
  });

  return Observable.forkJoin(observableBatch);
}

elsewhere() {
  this.processStuff( inputObject )
    .subscribe()
}

Finally, refer to the correct documentation - Angular2 uses RxJS v5 and link you provided is for RxJS v4. Docs are still incomplete for v5, but you can find descriptions in many of the source files.