I'm using a spring flux to send parallel requests to a service, this is very simplified version of it:
Flux.fromIterable(customers)
.flatMap { customer ->
client.call(customer)
} ...
I was wondering how I could cancel this flux, as in, grab a reference to the flux somehow and tell it to shut down.
As you probably know, with reactive objects, all operators are lazy. This means execution of the pipeline is delayed until the moment you subscribe to the reactive stream.
So, in your example, there is nothing to cancel yet because nothing is happening at that point.
But supposing your example was extended to:
Disposable disp = Flux.fromIterable(customers)
.flatMap { customer ->
client.call(customer)
}
.subscribe();
Then, as you can see, your subscription returns a Disposable
object that you can use to cancel the entire thing if you want, e.g.
disp.dispose()
Documentation of dispose says:
Cancel or dispose the underlying task or resource.
There’s another section of the documentation that says the following:
These variants [of operators] return a reference to the subscription that you can use to cancel the subscription when no more data is needed. Upon cancellation, the source should stop producing values and clean up any resources it created. This cancel and clean-up behavior is represented in Reactor by the general-purpose
Disposable
interface.
Therefore canceling the execution of stream is not free from complications on the reactive object side, because you want to make sure to leave the world in a consistent state if you cancel the stream in the middle of its processing. For example, if you were in the process of building something, you may want to discard resources, destroy any partial aggregation results, close files, channels, release memory or any other resources you have, potentially undoing changes or compensating for them.
You may want to read the documentation on cleanup about this, such that you also consider what you can do on the reactive object side.
Flux<String> bridge = Flux.create(sink -> {
sink.onRequest(n -> channel.poll(n))
.onCancel(() -> channel.cancel())
.onDispose(() -> channel.close())
});