I am using RxAndroid for stream operations. In my real use-case, i am fetching a list from the server (using Retrofit). I am using schedulers to do the work on a background thread and get the final emission on the Android UI (main) thread.
This works well for the network call, however i realized that my operators after the network call do not use the background thread, but getting called on the main thread.
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> {});
How can i make sure that all operations are executed on a background thread?
TL;DR: move observeOn(AndroidSchedulers.mainThread())
below filter(...)
.
subscribeOn(...)
is used to designate on which thread the Observable
will begin operating on. Subsequent calls to subscribeOn
will be ignored.
Thus, if you were to write the following, everything would be executed on Schedulers.newThread()
:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.subscribe(integer1 -> { doSomething(integer1); });
Now, of course, this is not what you want: you want to doSomething
on the main thread.
That is where observeOn
comes into place. All actions after observeOn
are executed on that scheduler. Therefore, in your example, the filter
is executed on the main thread.
Instead, move observeOn
down to just before subscribe
:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
Now, filter
will happen on the 'new thread', and doSomething
on the main thread.
To go even further, you can use observeOn
multiple times:
myService.fetchSomeIntegersFromServer()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.computation())
.filter(integer -> {
System.out.println(Looper.getMainLooper().getThread() == Thread.currentThread());
return true;
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(integer1 -> { doSomething(integer1) });
In this case, the fetching will occur on a new thread, the filtering on a computation thread, and doSomething
on the main thread.
Checkout ReactiveX - SubscribeOn operator for the official documentation.