How to handle error while executing Flux.map()

Victor picture Victor · Mar 26, 2016 · Viewed 18.2k times · Source

I´m trying to figure out how handle errors when mapping elements inside a Flux.

For instance, I´m parsing a CSV string into one of my business POJOs:

myflux.map(stock -> converter.convertHistoricalCSVToStockQuotation(stock));

Some of this lines might contain errors, so what I get in the log is:

 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.09 (Fri Apr 08 00:00:00 CEST 2016) H(38.419998)/L(37.849998)/O(37.970001))
 reactor.core.publisher.FluxLog:  onNext([SOME_BOGUS_QUOTE]@38.130001 (Thu Apr 07 00:00:00 CEST 2016) H(38.189999)/L(37.610001)/O(37.799999))
 reactor.core.publisher.FluxLog:  onError(java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo)
 reactor.core.publisher.FluxLog:  java.lang.IllegalArgumentException: Invalid CSV stock quotation: SOME_BOGUS_QUOTE,trololo

I read in the API some error handling methods, but most refered to returning an "error value" or using a fallback Flux, like this one:

Flux.onErrorResumeWith(myflux, x -> Mono.fromCallable(() -> ... do stuff);

However, using this with my myflux means that the whole flux is processed again.

So, is there a way to handle errors while processing particular elements (I.e ignoring them/Logging them) and keep processing the rest of the flux?

UPDATE with @akarnokd workaround

public Flux<StockQuotation> getQuotes(List<String> tickers)
{
    Flux<StockQuotation> processingFlux = Flux.fromIterable(tickers)
    // Get each set of quotes in a separate thread
    .flatMap(s -> Mono.fromCallable(() -> feeder.getCSVQuotes(s)))
    // Convert each list of raw quotes string in a new Flux<String>
    .flatMap(list -> Flux.fromIterable(list))
    // Convert the string to POJOs
    .flatMap(x -> {
            try {
                return Flux.just(converter.convertHistoricalCSVToStockQuotation(x));    
            }
            catch (IllegalArgumentException ex){
                System.out.println("Error decoding stock quotation: " + x);
                return Flux.empty();
            }
    });

    return processingFlux;
}

This works as a charm, however, as you can see the code is less elegant than before. Does not the Flux API have any method to do what this code does?

retry(...)
retryWhen(...)
onErrorResumeWith(...)
onErrorReturn(...)

Answer

akarnokd picture akarnokd · Mar 30, 2016

You need flatMap instead which let's you return an empty sequence if the processing failed:

myflux.flatMap(v -> {
    try {
        return Flux.just(converter.convertHistoricalCSVToStockQuotation(stock));
    } catch (IllegalArgumentException ex) {
        return Flux.empty();
    }
});