How do I sum the values in a Reactor Flux stream?

Mark picture Mark · Feb 27, 2018 · Viewed 7.6k times · Source

Suppose I have a repository with a findAll() method that returns an Iterable of State, where State is a class representing a US state that has two fields (with getter/setters): name, and population.

I want to get the sum of the population fields for all States in my Flux. I create a Flux from an Iterable, as follows:

Flux f = Flux.fromIterable(stateRepo.findAll());

I've got my Flux, but I don't know of a good way to sum up its values. I've tried something like

int total = 0;
f.map(s -> s.getPopulation()).subscribe(p -> total += v);
return total;

However, the compiler says that total "should be final or effectively final". Adding final obviously won't work, because I'm trying to add to it.

How do I go about summing (or any other aggregate function) on a Flux?

Answer

mroman picture mroman · Feb 27, 2018

Use reduce method:

@GetMapping("/populations")
    public Mono<Integer> getPopulation() {
        return Flux.fromIterable(stateRepo.findAll())
                .map(s -> s.getPopulation())
                .reduce(0, (x1, x2) -> x1 + x2)
                .map(this::someFunction); // here you can handle the sum
    }