Queuing tasks with RxJava in Android

Darko Smoljo picture Darko Smoljo · Apr 11, 2016 · Viewed 7.3k times · Source

I'm developing application for Android with background data synchronization. I'm currently using RxJava to post some data on server in regular intervals. Other than that, I'd like to provide user with a button "force sync" which will trigger sync immediately. I know how to use Observable.interval() to push data in regular time intervals and I would know how to use Observalbe.just() to push that one which is forced, but I'd like to queue them up if so happens that one is triggered while the previous still runs.

So let's take example when 1min is interval of automatic sync, and let's say that sync lasts 40sec (I'm over exaggerating here just to make easier point). Now if by any chance, user presses the "force" button when the automatic is still running (or vice versa - the automatic triggers when the forced one is still running), I'd like to queue the second sync request to go just as the first one finishes.

I've draw this image which may put some more perspective to it:

enter image description here

As you can see, the automatic is triggered (by some Observable.interval()), and in the middle of syncing, user presses "force" button. Now we want to wait for the first request to finish and then start again for the forced request. At one point, while the forced request was running, the new automatic request was triggered again which just added it to queue. After the last one was finished from the queue everything stops, and then the automatic was scheduled again little later on.

Hope somebody can point me to correct operator how to do this. I've tried with Observable.combineLatest(), but the queue list was dispatched at beginning and when I added new sync to queue it did not continue when the previous operation was completed.

Any help is greatly appreciated, Darko

Answer

akarnokd picture akarnokd · Apr 11, 2016

You can do this by merging the timer with the button click Observable/Subject, use the queueing effect of onBackpressureBuffer and concatMap the processing into it which makes sure that runs one at a time.

PublishSubject<Long> subject = PublishSubject.create();

Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS);

periodic.mergeWith(subject)
.onBackpressureBuffer()
.concatMap(new Func1<Long, Observable<Integer>>() {
    @Override
    public Observable<Integer> call(Long v) {
        // simulates the task to run
        return Observable.just(1)
                .delay(300, TimeUnit.MILLISECONDS);
    }
}
).subscribe(System.out::println, Throwable::printStackTrace);

Thread.sleep(1100);
// user clicks a button
subject.onNext(-1L);

Thread.sleep(800);