How to call forkJoin inside a pipe?

morynicz picture morynicz · Jul 3, 2019 · Viewed 7.9k times · Source

TL;DR Is it possible to use forkJoin inside a pipe of an observable?

Full story: I have a service that returns Observable of array of objects. For each of those objects I will need to make another call to a service which returns an observable, and apply another operation to each of the results.

op1 ->[] ----> op2 --> op3
          \\-> op2 --> op3
           \-> op2 --> op3

My current solution is this:

    this.tournamentParticipantService.getNotAssigned(this.tournamentId).subscribe(
      (players: Player[]) => {
        let dict = {};
        players.forEach(player => {
          dict[player.id] = this.teamService.add(
            { 
                id: 0,
                members: [],
                tournament: this.tournamentId,
                name: player.name + " " + player.surname
            })
            .pipe(
              map((team: Team) => 
                this.teamMemberService.add({ player: player.id, team: team.id })
              ))
        });
        forkJoin(dict).subscribe(result => console.log(result));
     });

I would like to get rid of the first subscribe and use pipe instead. Problem is that tutorials for forkJoin show it as the source to which an object or array is handed into, never as part of a pipe.

Calling forkJoin from inside map

.pipe(
    map(value=>forkJoin(value))

returns Observable<Observable<resolved forkJoin arguments>> which I probably would need to subscribe recursively. Seems like not a good way.

Putting fJ inside pipe without arguments

this.tournamentParticipantService.getNotAssigned(this.tournamentId).pipe(
      map((players: Player[]) => players.map(
        (player: Player) => this.teamService.add({ id: 0, members: [], tournament: this.tournamentId, name: player.name + " " + player.surname })
          .pipe(
            map((team: Team) => {
              let pipe = new JsonPipe();
              console.log("team: " + pipe.transform(team) + " player: " + pipe.transform(player));
              this.teamMemberService.add({ player: player.id, team: team.id });
            })))),
      forkJoin
    ).subscribe((result: [[Observable<void>]]) => {
      console.log(result)
      result[0].forEach(element => {
        element.subscribe(res => console.log(res));
      });
    });

ends with weird tangled structure of observables. This also does not seem to be a good way.

Is it even possible to use forkJoin from inside a pipe?

Answer

John picture John · Jul 3, 2019

I think the thing you're missing is forkJoin will return an observable, so the standard map isn't what you're looking for: you'd need to use one of the maps that will handle the observable result in the appropriate manner (i.e. switchMap, exhaustMap, mergeMap):

.pipe(
    switchMap(value => forkJoin(getThingOne(value), getThingTwo(value)))
.subscribe(([thing1, thing2]) => 
{

})