RX: Run Zipped Observables in parallel?

spierce7 picture spierce7 · Jan 18, 2014 · Viewed 10.4k times · Source

So I'm playing around with RX (really cool), and I've been converting my api that accesses a sqlite database in Android to return observables.

So naturally one of the problems I started to try to solve is, "What if I want to make 3 API calls, get the results, and then do some processing once they are all finished?"

It took me an hour or 2, but I eventually found the Zip Functionality and it helps me out handily:

    Observable<Integer> one = getNumberedObservable(1);
    Observable<Integer> two = getNumberedObservable(2);
    Observable<Integer> three = getNumberedObservable(3);

    Observable.zip(one, two, three, new Func3<Integer, Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer arg0, Integer arg1, Integer arg2) {
            System.out.println("Zip0: " + arg0);
            System.out.println("Zip1: " + arg1);
            System.out.println("Zip2: " + arg2);
            return arg0 + arg1 + arg2;
        }
    }).subscribe(new Action1<Integer>() {
        @Override
        public void call(Integer arg0) {
            System.out.println("Zipped Result: " + arg0);
        }
    });

public static Observable<Integer> getNumberedObservable(final int value) {
    return Observable.create(new OnSubscribeFunc<Integer>() {
        @Override
        public Subscription onSubscribe(Observer<? super Integer> observer) {
            observer.onNext(value);
            observer.onCompleted();
            return Subscriptions.empty();
        }
    });
}

Great! So that's cool.

So when I zip up the 3 observables they run in serial. What if I want them all to run in parallel at the same time so I end up getting the results faster? I've played around with a few things, and even tried reading some of the original RX stuff people have written in C#. I'm sure there is a simple answer. Can anyone point me in the right direction? What is the proper way to do this?

Answer

James World picture James World · Jan 18, 2014

zip does run the observables in parallel - but it also subscribes to them serially. Since your getNumberedObservable is completing in the subscription method it gives the impression of running serially, but there is in fact no such limitation.

You can either try with some long running Observables that outlive their subscription logic, such as timer, or use the subscribeOn method to subscribe asynchronously to each stream passed to zip.