Cancel task on timeout in RxJava

jstaffans picture jstaffans · Apr 18, 2015 · Viewed 7.5k times · Source

I'm experimenting with RxJava and Java 8's CompletableFuture class and do not quite get how to handle timeout conditions.

import static net.javacrumbs.futureconverter.java8rx.FutureConverter.toObservable;

// ...

    Observable<String> doSomethingSlowly() {
        CompletableFuture<PaymentResult> task = CompletableFuture.supplyAsync(() -> {
            // this call may be very slow - if it takes too long, 
            // we want to time out and cancel it.
            return processor.slowExternalCall();

        });

        return toObservable(task);
    }

    // ...

    doSomethingSlowly()
        .single()
        .timeout(3, TimeUnit.SECONDS, Observable.just("timeout"));

This basically works (if the timeout of three seconds is reached, "timeout" is published). I would however additionally want to cancel the future task that I've wrapped in an Observable - is that possible with an RxJava-centric approach?

I know that one option would be to handle the timeout myself, using task.get(3, TimeUnit.SECONDS), but I wonder if it's possible to do all task handling stuff in RxJava.

Answer

Rob Worsnop picture Rob Worsnop · Apr 18, 2015

Yes, you can do this. You would add a Subscription to the Subscriber.

This allows you to listen in on unsubscriptions, which will happen if you explicitly call subscribe().unsubscribe() or if the Observable completes successfully or with an error.

If you see an unsubscription before the future has completed, you can assume it's because of either an explicit unsubscribe or a timeout.

public class FutureTest {
    public static void main(String[] args) throws IOException {
        doSomethingSlowly()
                .timeout(1, TimeUnit.SECONDS, Observable.just("timeout"))
                .subscribe(System.out::println);
        System.in.read(); // keep process alive
    }

    private static Observable<String> doSomethingSlowly() {
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
            }
            return "Something";

        });
        return toObservable(future);
    }

    private static <T> Observable<T> toObservable(CompletableFuture<T> future) {
        return Observable.create(subscriber -> {
            subscriber.add(new Subscription() {
                private boolean unsubscribed = false;
                @Override
                public void unsubscribe() {
                    if (!future.isDone()){
                        future.cancel(true);
                    }
                    unsubscribed = true;
                }

                @Override
                public boolean isUnsubscribed() {
                    return unsubscribed;
                }
            });

            future.thenAccept(value -> {
                if (!subscriber.isUnsubscribed()){
                    subscriber.onNext(value);
                    subscriber.onCompleted();
                }
            }).exceptionally(throwable -> {
                if (!subscriber.isUnsubscribed()) {
                    subscriber.onError(throwable);
                }
                return null;
            });
        });
    }
}