How can I avoid multiple nested subscriptions using RxJS operators?

Benny1158 picture Benny1158 · Mar 29, 2019 · Viewed 7.4k times · Source

I am working on a file encryption and upload class using Angular. Many of these operations are async and therefore the methods I wrote are returning RxJS Observables.

// 1.
private prepareUpload(file): Observable<T>;

// 2.
private encryptData(data, filekey): Observable<T>

// 3.
private uploadEncryptedData(formData, token, range): Observable<T>

// 4.
private completeUpload(updatedFilekey, token): Observable<T>

I want to encapsulate this logic in a public upload(file) method and I ended up using nested subscriptions and it works but I know that it is wrong and an anti-pattern in RxJS for several reasons. Here is a simplified version of the code:

public upload(file) {
    const gen = this.indexGenerator(); // generator function

    this.prepareUpload(file).subscribe(values => {
    const [response, filekey, data] = values;

    this.encryptData(data, filekey).subscribe(encryptedDataContainer => {
      const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
      const range = this.getRange(file.size, gen.next().value);

      this.uploadEncryptedData(formData, response.token, range).subscribe(() => {
        if (range.isFinalPart) {
            this.completeUpload(encryptedDataContainer.updatedFilekey, response.token).subscribe(console.log);
        }
      });

    });

  });

}

I failed to clean this code using combinations of several RxJS operators. My goal is to avoid nested subscriptions and instead return a single Observable from the public upload() method when the workflow is completed.

Thanks!

Answer

Muhammad Ahsan Ayaz picture Muhammad Ahsan Ayaz · Mar 29, 2019

You can use mergeMap and filter operators from RxJs and chain your calls. You will need to create some function level variables to use during the chaining.

import { mergeMap, filter, catchError } from 'rxjs/operators`
public upload(file) {
    const gen = this.indexGenerator(); // generator function
    let range, token;
    this.prepareUpload(file)
      .pipe(
        mergeMap((values) => {
          const [response, filekey, data] = values;
          token = response.token;
          return this.encryptData(data, filekey);
        }),
        mergeMap(encryptedDataContainer => {
          const formData = this.prepareEncDataUpload(encryptedDataContainer.data, file.name)
          range = this.getRange(file.size, gen.next().value);

          return this.uploadEncryptedData(formData, token, range);
        }),
        filter(() => !!range.isFinalPart),
        mergeMap(() => {
          return this.completeUpload(encryptedDataContainer.updatedFilekey, token);
        })
        catchError((error) => {
          console.log(error);
          // handle the error accordingly.
        })
      )
      .subscribe(() => {
        console.log('success');
      });

  }