I have a method that returns Mono<Output>
:
interface Processor {
Mono<Output> process(Input input);
}
And I want to execute this processor
method for a collection:
List<Input> inputs = // get inputs
Processor processor = // get processor
List<Mono<Output>> outputs = inputs.stream().map(supplier::supply).collect(toList());
But instead of a List<Mono<Output>>
I want to get Mono<List<Output>>
that will contain aggregated results.
I tried reduce
, but the final result looks very clumsy:
Mono<List<Output>> result = inputs.stream().map(processor::process)
.reduce(Mono.just(new ArrayList<>()),
(monoListOfOutput, monoOfOutput) ->
monoListOfOutput.flatMap(list -> monoOfOutput.map(output -> {
list.add(output);
return list;
})),
(left, right) ->
left.flatMap(leftList -> right.map(rightList -> {
leftList.addAll(rightList);
return leftList;
})));
Can I achieve this with less code?
If you don't have to create stream for any reason, you could create Flux from your inputs, map it and collect list
Flux.fromIterable(inputs).flatMap(processor::process).collectList();