Proper usage of RxSwift to chain requests, flatMap or something else?

Wujo picture Wujo · Dec 1, 2016 · Viewed 10.6k times · Source

First of all, I'm new to rxswift so I guess the answer is obvious however at the moment I can't find solution by myself.

I have two functions:

func downloadAllTasks() -> Observable<[Task]>
func getTaskDetails(taskId: Int64) -> Observable<TaskDetails>

First one is downloading the list of Task objects using network request, second one downloading task details for sepcific task (using it's id)

What I want of achieve is to download all tasks and then for each task I want to download its details and subscribe for the event fired when all tasks details are ready.

So I guess I should subscribe somehow to Observable<[TaskDetails]> but I don't know how to do it.

        downloadAllTasks()
        .flatMap{
            ... // flatMap? something else?
        }
        .subscribe(
            onNext: { details in
                print("tasks details: \(details.map{$0.name})")
        })
        .addDisposableTo(disposeBag)

//EDIT

Thanks to Silvan Mosberger answer I'm much closer to the solution. One problem left. Now I have something like this:

    downloadAllTasks()
        .flatMap{ Observable.from($0) } 
        .map{ $0.id }
        .flatMap{ [unowned self] id in
            self.getTaskDetails(taskId: id).catchError{ error in
                print("$$$ Error downloading task \(id)")
                return .empty()
            }
        }
        .do(onNext: { _ in
            print(" $$$ single task details downloaded")
        } )
        .toArray()
        .debug("$$$ task details array debug", trimOutput: false)
        .subscribe({ _ in
            print("$$$ all tasks downloaded")
        })
        .addDisposableTo(disposeBag)

The output is

$$$ task details array debug -> subscribed
$$$ single task details downloaded
$$$ single task details downloaded
$$$ single task details downloaded

There are 3 tasks available so as you can se all of them are downloaded properly however for some reason the result of toArray() - (Observable<[TaskDetails]>) doesn't produce "onNext" once all task details are ready.

// Edit once more

Ok, I'm adding simplified version of functions providing observables, maybe it will help something

func downloadAllTasks() -> Observable<Task> {
    return Observable.create { observer in

            //... network request to download tasks
            //...

            for task in tasks {
                observer.onNext(task)
            }
            observer.onCompleted()

        return Disposables.create()
    }
}


func getTaskDetails(id: Int64) -> Observable< TaskDetails >  {
    return Observable.create { observer in

        //... network request to download task details
            //...

        observer.onNext(taskDetails)

        return Disposables.create()
    }
}

Answer

Silvan Mosberger picture Silvan Mosberger · Dec 1, 2016

With RxSwift you want to use Observables whenever possible, therefore I recommend you to refactor the downloadAllTasks method to return an Observable<Task>. This should be fairly trivial by just looping through the elements instead of emitting the array directly:

// In downloadAllTasks() -> Observable<Task>
for task in receivedTasks {
    observable.onNext(task)
}

If this is not possible for whatever reason, there is also an operator for that in RxSwift:

// Converts downloadAllTasks() -> Observable<[Task]> to Observable<Task>
downloadAllTasks().flatMap{ Observable.from($0) }

In the following code I will be using the refactored downloadAllTasks() -> Observable<Task> method because it's the cleaner approach.

You can then map your tasks to get their id (assuming your Task type has the id: Int64 property) and flatMap with the downloadAllTasks function to get an Observable<TaskDetails>:

let details : Observable<TaskDetails> = downloadAllTasks()
    .map{ $0.id }
    .flatMap(getTaskDetails)

Then you can use the toArray() operator to gather the whole sequence and emit an event containing all elements in an array:

let allDetails : Observable<[TaskDetails]> = details.toArray()

In short, without type annotations and sharing the tasks (so you won't download them only once):

let tasks = downloadAllTasks().share()

let allDetails = tasks
    .map{ $0.id }
    .flatMap(getTaskDetails)
    .toArray()

EDIT: Note that this Observable will error when any of the detail downloads encounters an error. I'm not exactly sure what's the best way to prevent this, but this does work:

let allDetails = tasks
    .map{ $0.id }
    .flatMap{ id in
        getTaskDetails(id: id).catchError{ error in
            print("Error downloading task \(id)")
            return .empty()
        }
    }
    .toArray()

EDIT2: It's not gonna work if your getTaskDetails returns an observable that never completes. Here is a simple reference implementation of getTaskDetails (with String instead of TaskDetails), using JSONPlaceholder:

func getTaskDetails(id: Int64) -> Observable<String> {
    let url = URL(string: "https://jsonplaceholder.typicode.com/posts/\(id)")!
    return Observable.create{ observer in
        let task = URLSession.shared.dataTask(with: url) { data, response, error in
            if let error = error {
                observer.onError(error)
            } else if let data = data, let result = String(data: data, encoding: .utf8) {
                observer.onNext(result)
                observer.onCompleted()
            } else {
                observer.onError("Couldn't get data")
            }
        }
        task.resume()

        return Disposables.create{
            task.cancel()
        }
    }
}