I'm experimenting with RxJava and Java 8's CompletableFuture class and do not quite get how to handle timeout conditions.
import static net.javacrumbs.futureconverter.java8rx.FutureConverter.toObservable;
// ...
Observable<String> doSomethingSlowly() {
CompletableFuture<PaymentResult> task = CompletableFuture.supplyAsync(() -> {
// this call may be very slow - if it takes too long,
// we want to time out and cancel it.
return processor.slowExternalCall();
});
return toObservable(task);
}
// ...
doSomethingSlowly()
.single()
.timeout(3, TimeUnit.SECONDS, Observable.just("timeout"));
This basically works (if the timeout of three seconds is reached, "timeout" is published). I would however additionally want to cancel the future task that I've wrapped in an Observable
- is that possible with an RxJava-centric approach?
I know that one option would be to handle the timeout myself, using task.get(3, TimeUnit.SECONDS)
, but I wonder if it's possible to do all task handling stuff in RxJava.
Yes, you can do this. You would add a Subscription
to the Subscriber
.
This allows you to listen in on unsubscriptions, which will happen if you explicitly call subscribe().unsubscribe()
or if the Observable
completes successfully or with an error.
If you see an unsubscription before the future has completed, you can assume it's because of either an explicit unsubscribe
or a timeout.
public class FutureTest {
public static void main(String[] args) throws IOException {
doSomethingSlowly()
.timeout(1, TimeUnit.SECONDS, Observable.just("timeout"))
.subscribe(System.out::println);
System.in.read(); // keep process alive
}
private static Observable<String> doSomethingSlowly() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
return "Something";
});
return toObservable(future);
}
private static <T> Observable<T> toObservable(CompletableFuture<T> future) {
return Observable.create(subscriber -> {
subscriber.add(new Subscription() {
private boolean unsubscribed = false;
@Override
public void unsubscribe() {
if (!future.isDone()){
future.cancel(true);
}
unsubscribed = true;
}
@Override
public boolean isUnsubscribed() {
return unsubscribed;
}
});
future.thenAccept(value -> {
if (!subscriber.isUnsubscribed()){
subscriber.onNext(value);
subscriber.onCompleted();
}
}).exceptionally(throwable -> {
if (!subscriber.isUnsubscribed()) {
subscriber.onError(throwable);
}
return null;
});
});
}
}