Mock Retrofit Observable<T> response in Android Unit tests

I have an API interface and I'm testing a View that involves network calls.

@Config(emulateSdk = 18)
public class SampleViewTest extends RobolectricTestBase {

    ServiceApi apiMock;

    SampleView fixture;

    public void setUp() {
        super.setUp(); //injection is performed in super
        apiMock = mock(ServiceApi.class);
        fixture = new SampleView(activity);

    public void testSampleViewCallback() {
        when(apiMock.requestA()).thenReturn(Observable.from(new ResponseA());
        when(apiMock.requestB()).thenReturn(Observable.from(new ResponseB());

        AtomicReference<Object> testResult = new AtomicReference<>();
        fixture.updateView(new Callback() {

            public void onSuccess(Object result) {

            public void onError(Throwable error) {
                throw new RuntimeException(error);

        verify(apiMock, times(1)).requestA();
        verify(apiMock, times(1)).requestB();


For some reason apiMock methods are never called and verification always fails.

In my view I'm calling my api like this

    .subscribe(new Observer());

What am I missing here?

Update #1:
After some investigation it appears that when in my implementation (sample above) I observeOn(AndroidSchedulers.mainThread()) subscriber is not called. Still do not know why.

Update #2:
When subscribing just like that apiV2.requestA().subscribe(new Observer()); everything works just fine - mock api is called and test passes.
Advancing ShadowLooper.idleMainLooper(5000) did nothing. Even grabbed looper from handler in HandlerThreadScheduler and advanced it. Same result.

Update #3: Adding actual code where API is used.

public void updateView(final Callback) {, wrapObservable(api.requestB()),
        new Func2<ResponseA, ResponseB, Object>() {
            public Object call(ResponseA responseA, ResponseB responseB) {
                return mergeBothResponses(responseA, responseB);
    ).subscribe(new EndlessObserver<Object>() {

        public void onError(Throwable e) {

        public void onNext(Object config) {
            Log.d("Configuration updated [%s]", config.toString());

protected <T> Observable<T> wrapObservable(Observable<T> observable) {
    return observable.subscribeOn(;


Miguel Lavigne picture Miguel Lavigne · Jun 9, 2014

I'm still wrapping my head around how to properly use rxjava myself but I would try to modify your code so that you only observeOn(mainThread) on the final zipped Observable instead of doing it on both of the original request response's Observable. I would then verify if this affect the fact that you have to advance both Loopers or not.

To simply your tests and remove the need for Looper idling I would take the threading out of the equation since you don't need background processing when running tests. You can do that by having your Schedulers injected instead of creating them statically. When running your production code you'd have the AndroidSchedulers.mainThread and injected and when running tests code you would inject Schedulers.immediate where applicable.

@UIScheduler /* Inject Schedulers.immediate for tests and AndroidSchedulers.mainThread for production code */
private Scheduler mainThreadSched;

@IOScheduler /* Inject Scheduler.immediate for tests and for production code */
private Scheduler ioSched;

public void updateView(final Callback) {, wrapObservable(api.requestB()),
        new Func2<ResponseA, ResponseB, Object>() {
            public Object call(ResponseA responseA, ResponseB responseB) {
                return mergeBothResponses(responseA, responseB);
    .subscribe(new EndlessObserver<Object>() {

        public void onError(Throwable e) {

        public void onNext(Object config) {
            Log.d("Configuration updated [%s]", config.toString());

protected <T> Observable<T> wrapObservable(Observable<T> observable) {
    return observable.subscribeOn(ioSched);