What is the proper way to handle subscriptions in RxJava/RxAndroid for an Activity Lifecycle?

Sree picture Sree · Aug 3, 2015 · Viewed 14k times · Source

I am just getting started on RxJava/RxAndroid. I want to avoid context leaks so I created a BaseFragment like so:

public abstract class BaseFragment extends Fragment {

    protected CompositeSubscription compositeSubscription = new CompositeSubscription();

    @Override
    public void onDestroy() {
        super.onDestroy();

        compositeSubscription.unsubscribe();
    } 
} 

And inside my fragment that extends BaseFragment, I am doing this:

protected void fetchNewerObjects(){
        if(!areNewerObjectsFetching()){ //if it is not already fetching newer objects

            Runtime.getRuntime().gc();//clean out memory if possible

            fetchNewObjectsSubscription = Observable
                .just(new Object1())
                .map(new Func1<Object1, Object2>() {
                    @Override
                    public Object2 call(Object1 obj1) {
                        //do bg stuff
                        return obj2;
                    }
                })
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Object2>() {
                    @Override
                    public void onCompleted() {
                        compositeSubscription.remove(fetchNewObjectsSubscription);
                        fetchNewObjectsSubscription = null;
                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onNext(ArrayList<NewsFeedObject> newsFeedObjects) {
                        //do stuff
                    }
                });

        //add subscription to composite subscription so it can be unsubscribed onDestroy()
        compositeSubscription.add(fetchNewObjectsSubscription);
    }
}

protected boolean areNewerObjectsFetching(){
    if(fetchNewObjectsSubscription == null || fetchNewObjectsSubscription.isUnsubscribed()){ //if its either null or is in a finished status
        return false;
    }
    return true;
}

So I guess my question is two-fold:

  1. Will this stop context leaks because I am unsubscribing onDestroy()?

  2. And am I properly keeping track of wether the observable is "running" by setting the subscription to null after completion and checking the nullity?

Answer

marwinXXII picture marwinXXII · Aug 5, 2015
  1. Yes, it will stop, but you should also set subscription to null in onError too (or after error, you won't load items again).

    Also do not forget that fragment can be stopped, but not destroyed (in back stack, for example) and you might not want to observe anything in this case. If you move unsubscribe from onDestroy to onStop do not forget to initialise compositeSubscription in onCreateView each time view is created (because after CompositeSubscription is unsubscribed you no longer can add subscriptions there).

  2. Yes correct. But I think that compositeSubscription.remove can be omitted, because you already check for null.