Process Observable on background thread

WonderCsabo picture WonderCsabo · Oct 27, 2015 · Viewed 20k times · Source

I am using RxAndroid for stream operations. In my real use-case, i am fetching a list from the server (using Retrofit). I am using schedulers to do the work on a background thread and get the final emission on the Android UI (main) thread.

This works well for the network call, however i realized that my operators after the network call do not use the background thread, but getting called on the main thread.

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .subscribe(integer1 -> {});

How can i make sure that all operations are executed on a background thread?

Answer

nhaarman picture nhaarman · Oct 27, 2015

TL;DR: move observeOn(AndroidSchedulers.mainThread()) below filter(...).


subscribeOn(...) is used to designate on which thread the Observable will begin operating on. Subsequent calls to subscribeOn will be ignored.

Thus, if you were to write the following, everything would be executed on Schedulers.newThread():

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .subscribe(integer1 -> { doSomething(integer1); });

Now, of course, this is not what you want: you want to doSomething on the main thread.
That is where observeOn comes into place. All actions after observeOn are executed on that scheduler. Therefore, in your example, the filter is executed on the main thread.

Instead, move observeOn down to just before subscribe:

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer1 -> { doSomething(integer1) });

Now, filter will happen on the 'new thread', and doSomething on the main thread.


To go even further, you can use observeOn multiple times:

myService.fetchSomeIntegersFromServer()
        .subscribeOn(Schedulers.newThread())
        .observeOn(Schedulers.computation())
        .filter(integer -> {
            System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
            return true;
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(integer1 -> { doSomething(integer1) });

In this case, the fetching will occur on a new thread, the filtering on a computation thread, and doSomething on the main thread.

Checkout ReactiveX - SubscribeOn operator for the official documentation.