I'm trying to use RX Java to consume some data coming from a source that keeps sending objects.
I'm wondering how to implement a retry policy for cases in which my own code throws an exception. For example a network exception should trigger a retry with exponential backoff policy.
Some code :
message.map(this::processMessage)
.subscribe((message)->{
//do something after mapping
});
processMessage(message)
is the method which contains the risky code that might fail and its the part of code that I want to retry but I dont want to stop the observable from consuming data from the source.
Any thoughts on this?
message
.map(this::processMessage)
.retryWhen(errors -> errors.flatMap(error -> {
if (error instanceof IOException) {
return Observable.just(null);
}
// For anything else, don't retry
return Observable.error(error);
})
.subscribe(
System.out::println,
error -> System.out.println("Error!")
);
or catch the error
message.map(this::processMessage)
.onErrorReturn(error -> "Empty result")
.subscribe((message)->{})
or procses the error
message
.map(this::processMessage)
.doOnError(throwable -> Log.e(TAG, "Throwable " + throwable.getMessage()))
.subscribe(
System.out::println,
error -> System.out.println("Error!")
);
Untested, but retryWhen differs to repeatWhen that is not only called in onComplete.
http://blog.danlew.net/2016/01/25/rxjavas-repeatwhen-and-retrywhen-explained/ -> Each error is flatmapped so that we can either return onNext(null) (to trigger a resubscription) or onError(error) (to avoid resubscription).
Backoff Policy:
source.retryWhen(errors ->
errors
.zipWith(Observable.range(1, 3), (n, i) -> i)
.flatMap(retryCount -> Observable.timer((long) Math.pow(5, retryCount), TimeUnit.SECONDS))
);
flatMap + timer is preferable over delay in this case because it lets us modify the delay by the number of retries. The above retries three times and delays each retry by 5 ^ retryCount, giving you exponential backoff with just a handful of operators!