I am using Spring Reactor 3.1.0.M3 and have a use case where I need to merge Mono's from multiple sources. I found that if one of the Monos is an empty Mono, zip fails without an error.
Example:
Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.empty();
Mono<String> combined = Mono.zip(strings -> {
StringBuffer sb = new StringBuffer();
for (Object string : strings) {
sb.append((String) string);
}
return sb.toString();
}, m1, m2, m3);
System.out.println("Combined " + combined.block());
When m3 is added, the combinator is skipped in the response is null. When I remove m3, it all works as expected and "AB" is returned. Is there a way I could handle this by detecting the empty Mono? Also, is there a way to have the combinator method know the type of the object instead of having to cast?
The zip operator doesn't behave like this. It would be in fact counter-intuitive: your code is expecting a Tuple of 3 elements and you're only getting two?!?
In this case, you're in control and only you can decide what's a good default value if none is provided (remember, null
values are forbidden by the reactive streams specification).
Mono<String> m1 = Mono.just("A");
Mono<String> m2 = Mono.just("B");
Mono<String> m3 = Mono.empty().defaultIfEmpty("");
Mono<String> combined = Mono.when(m1, m2, m3).map(t -> {
StringBuffer sb = new StringBuffer();
sb.append(t.getT1());
sb.append(t.getT2());
sb.append(t.getT3());
return sb.toString();
});
You seem to be confused by the nature of a Publisher
type, see:
if one of the Monos is an empty Mono, zip fails without an error
and
So if I was to try and zip Mono's and for some reason one is empty, the zip would fail and I cannot seem to put in any code to safeguard against that
An empty Mono
isn't a failure case: it's just that no value is emitted and it is completed successfully. You can verify that by changing the code sample:
combined.subscribe(
s -> System.out.println("element: " + s), // doesn't execute
s -> System.out.println("error: " + s), // doesn't execute
() -> { System.out.println("complete!"); // prints
});
So depending on your requirements, you can:
defaultIfEmpty
operator on those 3 Mono
instances, if there are convenient default values you can rely ondefaultIfEmpty
operator on the combined Mono
, with a default value or even transform that into an error message with combined.switchIfEmpty(Mono.error(...))