RxJava: How to interrupt thread on unsubscribe?

ntoskrnl picture ntoskrnl · May 12, 2015 · Viewed 7.2k times · Source

I use Observable.create() to create an observable to perform some work on a scheduler (e.g. Schedulers.io() and then return a result on AndroidSchedulers.mainThread().

val subscription = observable<T> {
        try {
            // perform action synchronously
            it.onNext(action.invoke(context, args))
            it.onCompleted()
        } catch (t: Exception) {
            it.onError(t)
        }
    }.subscribeOn(scheduler)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(
                    {
                        // handle result here
                        result.set(it)
                    },
                    {
                        // handle error here
                        errorHandler.handleTaskError(model, this, it)
                    },
                    {
                        // notify completed
                        model.completeTask(this)
                    }
            )

The operation inside action.invoke() is synchronous and might be a blocking IO operation. When user decides to cancel it, I unsubscribe from the observable: subscription.unsubscribe()

However, the I/O operation is not being interrupted. Is there any rx-java API to interrupt the operation?

Answer

dwursteisen picture dwursteisen · May 12, 2015

When you call yourSubscription.unsubscribe();, Rx will call your unsubscribe code.

This unsuscribe code will be the Subscription class that you can add to your subscriber when you create your Observable.

Observable<Object> obs = Observable.create(subscriber -> {

      subscriber.add(new Subscription() {
            @Override
            public void unsubscribe() {
                 // perform unsubscription
            }

            @Override
            public boolean isUnsubscribed() {
                return false;
            }
        });
      subscriber.onNext(/**...*/);
      subscriber.onCompleted();
}

So in the unsubscribe method, you can interrupt your job, if you have a way to do so.

Please note that the unsubscribe method is called when you unsubscribe from an Observable or when An Observable when it's completed. (it will unsubscribe by itself)

edit : taking vladimir mironov comment in account