I'm learning RxJava and, as my first experiment, trying to rewrite the code in the first run()
method in this code (cited on Netflix's blog as a problem RxJava can help solve) to improve its asynchronicity using RxJava, i.e. so it doesn't wait for the result of the first Future (f1.get()
) before proceeding on to the rest of the code.
f3
depends on f1
. I see how to handle this, flatMap
seems to do the trick:
Observable<String> f3Observable = Observable.from(executor.submit(new CallToRemoteServiceA()))
.flatMap(new Func1<String, Observable<String>>() {
@Override
public Observable<String> call(String s) {
return Observable.from(executor.submit(new CallToRemoteServiceC(s)));
}
});
Next, f4
and f5
depend on f2
. I have this:
final Observable<Integer> f4And5Observable = Observable.from(executor.submit(new CallToRemoteServiceB()))
.flatMap(new Func1<Integer, Observable<Integer>>() {
@Override
public Observable<Integer> call(Integer i) {
Observable<Integer> f4Observable = Observable.from(executor.submit(new CallToRemoteServiceD(i)));
Observable<Integer> f5Observable = Observable.from(executor.submit(new CallToRemoteServiceE(i)));
return Observable.merge(f4Observable, f5Observable);
}
});
Which starts to get weird (merge
ing them probably isn't what I want...) but allows me to do this at the end, not quite what I want:
f3Observable.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("Observed from f3: " + s);
f4And5Observable.subscribe(new Action1<Integer>() {
@Override
public void call(Integer i) {
System.out.println("Observed from f4 and f5: " + i);
}
});
}
});
That gives me:
Observed from f3: responseB_responseA
Observed from f4 and f5: 140
Observed from f4 and f5: 5100
which is all the numbers, but unfortunately I get the results in separate invocations, so I can't quite replace the final println in the original code:
System.out.println(f3.get() + " => " + (f4.get() * f5.get()));
I don't understand how to get access to both those return values on the same line. I think there's probably some functional programming fu I'm missing here. How can I do this? Thanks.
It looks like all you really need is a bit more encouragement and perspective on how RX is used. I'd suggest you read more into the documentation as well as marble diagrams (I know they're not always useful). I also suggest looking into the lift()
function and operators.
map
, flatMap
and filter
are to manipulate the data in your data flowThe point of operators are to allow you to disrupt a steady stream of observables and define your own operations on a data flow. For example, I coded a moving average operator. That sums up n
double
s in an Observable of doubles to return a stream of moving averages. The code literally looked like this
Observable movingAverage = Observable.from(mDoublesArray).lift(new MovingAverageOperator(frameSize))
You'll be a relieved that a lot of the filtering methods that you take for granted all have lift()
under the hood.
With that said; all it takes to merge multiple dependencies is:
map
or flatMap