Spring Reactor: Mono.zip fails on empty Mono

athom picture athom · Aug 17, 2017 · Viewed 12.7k times · Source

I am using Spring Reactor 3.1.0.M3 and have a use case where I need to merge Mono's from multiple sources. I found that if one of the Monos is an empty Mono, zip fails without an error.

Example:

Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.empty();

Mono<String> combined = Mono.zip(strings -> {
    StringBuffer sb = new StringBuffer();
    for (Object string : strings) {
        sb.append((String) string);
    }
    return sb.toString();
}, m1, m2, m3);
System.out.println("Combined " + combined.block());

When m3 is added, the combinator is skipped in the response is null. When I remove m3, it all works as expected and "AB" is returned. Is there a way I could handle this by detecting the empty Mono? Also, is there a way to have the combinator method know the type of the object instead of having to cast?

Answer

Brian Clozel picture Brian Clozel · Aug 25, 2017

The zip operator doesn't behave like this. It would be in fact counter-intuitive: your code is expecting a Tuple of 3 elements and you're only getting two?!?

In this case, you're in control and only you can decide what's a good default value if none is provided (remember, null values are forbidden by the reactive streams specification).

Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.empty().defaultIfEmpty("");

Mono<String> combined = Mono.when(m1, m2, m3).map(t -> {
    StringBuffer sb = new StringBuffer();
    sb.append(t.getT1());
    sb.append(t.getT2());
    sb.append(t.getT3());
    return sb.toString();
});

Edit

You seem to be confused by the nature of a Publisher type, see:

if one of the Monos is an empty Mono, zip fails without an error

and

So if I was to try and zip Mono's and for some reason one is empty, the zip would fail and I cannot seem to put in any code to safeguard against that

An empty Mono isn't a failure case: it's just that no value is emitted and it is completed successfully. You can verify that by changing the code sample:

    combined.subscribe(
            s -> System.out.println("element: " + s), // doesn't execute
            s -> System.out.println("error: " + s), // doesn't execute
            () -> { System.out.println("complete!"); // prints
    });

So depending on your requirements, you can:

  • apply a defaultIfEmpty operator on those 3 Mono instances, if there are convenient default values you can rely on
  • apply a defaultIfEmpty operator on the combined Mono, with a default value or even transform that into an error message with combined.switchIfEmpty(Mono.error(...))