I use Observable.create()
to create an observable to perform some work on a scheduler (e.g. Schedulers.io()
and then return a result on AndroidSchedulers.mainThread()
.
val subscription = observable<T> {
try {
// perform action synchronously
it.onNext(action.invoke(context, args))
it.onCompleted()
} catch (t: Exception) {
it.onError(t)
}
}.subscribeOn(scheduler)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
{
// handle result here
result.set(it)
},
{
// handle error here
errorHandler.handleTaskError(model, this, it)
},
{
// notify completed
model.completeTask(this)
}
)
The operation inside action.invoke()
is synchronous and might be a blocking IO operation. When user decides to cancel it, I unsubscribe from the observable: subscription.unsubscribe()
However, the I/O operation is not being interrupted. Is there any rx-java API to interrupt the operation?
When you call yourSubscription.unsubscribe();
, Rx will call your unsubscribe code.
This unsuscribe code will be the Subscription
class that you can add
to your subscriber
when you create your Observable
.
Observable<Object> obs = Observable.create(subscriber -> {
subscriber.add(new Subscription() {
@Override
public void unsubscribe() {
// perform unsubscription
}
@Override
public boolean isUnsubscribed() {
return false;
}
});
subscriber.onNext(/**...*/);
subscriber.onCompleted();
}
So in the unsubscribe
method, you can interrupt your job, if you have a way to do so.
Please note that the unsubscribe method is called when you unsubscribe from an Observable or when An Observable when it's completed. (it will unsubscribe by itself)
edit : taking vladimir mironov comment in account