Chaining RxJava observables with callbacks/listeners

hiBrianLee picture hiBrianLee · Apr 16, 2015 · Viewed 7k times · Source

I am using Retrofit with Observables, and would like to chain the observables. Usually it works well with functions like map() or flatMap(), since the api returns an Observable that does the task. But in this case I have to do the following:

  1. getKey() from the api
  2. Use the returned key in another library Foo and wait for the callback to be called.
  3. When the callback returns, send the result to the api.

I'd like this to be a single chained call so that I just have to subscribe once. I'm guessing I can use merge() or join() or something, but wasn't sure what the best approach would be to handle the callback.

Is there a way to make this better? This is what I have so far:

api.getKey().subscribe(new Action1<String>() {
   @Override
   public void call(String key) {
      Foo foo = new Foo();
      foo.setAwesomeCallback(new AwesomeCallback() {
         @Override
         public void onAwesomeReady(String awesome) {
            api.sendAwesome(awesome)
                    .subscribe(new Action1<Void>() {
                       @Override
                       public void call(Void aVoid) {
                           handleAwesomeSent();
                       }
                    });
         }
      });
      foo.makeAwesome();
   }
});

Answer

david.mihola picture david.mihola · Apr 16, 2015

Adapting clemp6r's solution, here is another one that needs neither Subjects nor nested Subscriptions:

api.getKey().flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String key) {

        return Observable.create(new Observable.OnSubscribe<String>(){

            @Override
            public void call(final Subscriber<? super String> subscriber) {
                Foo foo = new Foo();
                foo.setAwesomeCallback(new AwesomeCallback() {
                    @Override
                    public void onAwesomeReady(String awesome) {
                        if (! subscriber.isUnsubscribed()) {
                            subscriber.onNext(awesome);
                            subscriber.onComplete();
                        }
                    }
                });
                foo.makeAwesome();
            } 
        });
}).flatMap(new Func1<String, Observable<String>>() {
    @Override
    public Observable<String> call(String awesome) {
        return sendAwesome(awesome);
   }
}).subscribe(new Action1<Void>() {
    @Override
    public void call(Void aVoid) {
        handleAwesomeSent();
    }
});

In general I think it always possibly to wrap any callback based asynchronous operation in an Observable using Observable.create().