RxJava: how to compose multiple Observables with dependencies and collect all results at the end?

Steve Kehlet picture Steve Kehlet · Mar 7, 2014 · Viewed 35.8k times · Source

I'm learning RxJava and, as my first experiment, trying to rewrite the code in the first run() method in this code (cited on Netflix's blog as a problem RxJava can help solve) to improve its asynchronicity using RxJava, i.e. so it doesn't wait for the result of the first Future (f1.get()) before proceeding on to the rest of the code.

f3 depends on f1. I see how to handle this, flatMap seems to do the trick:

Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
    .flatMap(new Func1<String, Observable<String>>() {
        @Override
        public Observable<String> call(String s) {
            return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
        }
    });

Next, f4 and f5 depend on f2. I have this:

final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(Integer i) {
            Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
            Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
            return Observable.merge(f4Observable, f5Observable);
        }
    });

Which starts to get weird (mergeing them probably isn't what I want...) but allows me to do this at the end, not quite what I want:

f3Observable.subscribe(new Action1<String>() {
    @Override
    public void call(String s) {
        System.out.println("Observed from f3: " + s);
        f4And5Observable.subscribe(new Action1<Integer>() {
            @Override
            public void call(Integer i) {
                System.out.println("Observed from f4 and f5: " + i);
            }
        });
    }
});

That gives me:

Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100

which is all the numbers, but unfortunately I get the results in separate invocations, so I can't quite replace the final println in the original code:

System.out.println(f3.get() + " => " + (f4.get() * f5.get()));

I don't understand how to get access to both those return values on the same line. I think there's probably some functional programming fu I'm missing here. How can I do this? Thanks.

Answer

user3407713 picture user3407713 · Aug 13, 2014

It looks like all you really need is a bit more encouragement and perspective on how RX is used. I'd suggest you read more into the documentation as well as marble diagrams (I know they're not always useful). I also suggest looking into the lift() function and operators.

  • The entire point of an observable is to concatenate data flow and data manipulation into a single object
  • The point of calls to map, flatMap and filter are to manipulate the data in your data flow
  • The point of merges are to combine data flows
  • The point of operators are to allow you to disrupt a steady stream of observables and define your own operations on a data flow. For example, I coded a moving average operator. That sums up n doubles in an Observable of doubles to return a stream of moving averages. The code literally looked like this

    Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))

You'll be a relieved that a lot of the filtering methods that you take for granted all have lift() under the hood.

With that said; all it takes to merge multiple dependencies is:

  • changing all incoming data to a standard data type using map or flatMap
  • merging standard data-types to a stream
  • using custom operators if one object needs to wait on another, or if you need to order data in the stream. Caution: this approach will slow the stream down
  • using to list or subscribe to collect all of that data