RxJS 6 - Cancel / End a Pipe

RookieMcRookie picture RookieMcRookie · Jul 23, 2018 · Viewed 16.3k times · Source

Working with the new version of RxJS 6 and the pipe operator in particular. Currently using the pipe to take the results of an API call and pass them to a series of additional tasks.

All works great, but can't seem to find a way to cancel or end a pipe should I encounter an issue. For example, I'm using the tap operator to check if the value is null. I then throw an error, but the pipe still appears to move to the next task, in this case concatmap.

Therefore, how do you end or cancel a pipe prematurely? Thanks in advance.

getData(id: String): Observable<any[]> {
return this.http.get<any>(`${this.baseUrl}/path/${id}`).pipe(
   tap(evt => {
    if (evt == null) {
      return throwError(new Error("No data found..."));
    }
  }),
concatMap(
  evt =>
     <Observable<any[]>>(
        this.http.get<any[]>(
    `${this.baseUrl}/path/relatedby/${evt.child_id}`
      ).map(res =>( {"response1":evt, "response2":res}) )
 )
),
retry(3),
catchError(this.handleError("getData", []))
);}

Answer

DeborahK picture DeborahK · Jul 24, 2018

I tried the basic concept from what you have with this stackblitz and it worked. It cancelled the remaining operations. See the link below.

https://stackblitz.com/edit/angular-4ctwsd?file=src%2Fapp%2Fapp.component.ts

Differences I see between your code and mine is that I used throw and not throwError (is that something you wrote?) and I'm just throwing the error ... not returning a thrown error.

Here is the code for reference:

import { Component } from '@angular/core';
import { of, from } from 'rxjs';
import { map, catchError, tap, retry} from 'rxjs/operators';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent {
  name = 'Angular 6';

  constructor() {
    of('a', 'b', 'c', 'd')
      .pipe(
       map(x => {
        if (x === 'c') {
          throw 'An error has occurred';
        }
        return x;
       }),
       tap(x => console.log('In tap: ', x)),
       retry(3),
       catchError(() => of('caught error!'))
      )
      .subscribe(x => console.log(x));
  }
}