I'm a starter in Spring Web-Flux. I wrote a controller as follows:
@RestController
public class FirstController
{
@GetMapping("/first")
public Mono<String> getAllTweets()
{
return Mono.just("I am First Mono")
}
}
I know one of the reactive benefits is Backpressure, and it can balance the request or the response rate. I want to realize how to have backpressure mechanism in Spring Web-Flux.
In order to understand how Backpressure works in the current implementation of the WebFlux framework, we have to recap the transport layer used by default here. As we may remember, the normal communication between browser and server (server to server communication usually the same as well) is done through the TCP connection. WebFlux also uses that transport for communication between a client and the server. Then, in order to get the meaning of the backpressure control term, we have to recap what backpressure means from the Reactive Streams specification perspective.
The basic semantics define how the transmission of stream elements is regulated through back-pressure.
So, from that statement, we may conclude that in Reactive Streams the backpressure is a mechanism that regulates the demand through the transmission (notification) of how many elements recipient can consume; And here we have a tricky point. The TCP has a bytes abstraction rather than logical elements abstraction. What we usually want by saying backpressure control is the control of the number of logical elements sent/received to/from the network. Even though the TCP has its own flow control (see the meaning here and animation there) this flow control is still for bytes rather than for logical elements.
In the current implementation of the WebFlux module, the backpressure is regulated by the transport flow control, but it does not expose the real demand of the recipient. In order to finally see the interaction flow, please see the following diagram:
For simplicity, the above diagram shows the communication between two microservices where the left one sends streams of data, and the right one consumes that stream. The following numbered list provides a brief explanation of that diagram:
As we may notice from the diagram above, the demand exposed by the recipient is different from the demand of the sender (demand here in logical elements). It means that the demand of both is isolated and works only for WebFlux <-> Business logic (Service) interaction and exposes less the backpressure for Service A <-> Service B interaction.
All that means that the backpressure control is not that fair in WebFlux as we expect.
If we still want to have an unfair control of backpressure in WebFlux, we may do that with the support of Project Reactor operators such as limitRate()
. The following example shows how we may use that operator:
@PostMapping("/tweets")
public Mono<Void> postAllTweets(Flux<Tweet> tweetsFlux) {
return tweetService.process(tweetsFlux.limitRate(10))
.then();
}
As we may see from the example, limitRate()
operator allows defining the number of elements to be prefetched at once. That means that even if the final subscriber requests Long.MAX_VALUE
elements, the limitRate
operator split that demand into chunks and does not allow to consume more than that at once. The same we may do with elements sending process:
@GetMapping("/tweets")
public Flux<Tweet> getAllTweets() {
return tweetService.retreiveAll()
.limitRate(10);
}
The above example shows that even if WebFlux requests more then 10 elements at a time, the limitRate()
throttles the demand to the prefetch size and prevents to consume more than the specified number of elements at once.
Another option is to implement own Subscriber
or extend the BaseSubscriber
from Project Reactor. For instance, The following is a naive example of how we may do that:
class MyCustomBackpressureSubscriber<T> extends BaseSubscriber<T> {
int consumed;
final int limit = 5;
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(limit);
}
@Override
protected void hookOnNext(T value) {
// do business logic there
consumed++;
if (consumed == limit) {
consumed = 0;
request(limit);
}
}
}
In order to achieve logical-elements backpressure through the network boundaries, we need an appropriate protocol for that. Fortunately, there is one called RScoket protocol. RSocket is an application-level protocol that allows transferring real demand through the network boundaries. There is an RSocket-Java implementation of that protocol that allows to set up an RSocket server. In the case of a server to server communication, the same RSocket-Java library provides a client implementation as well. To learn more how to use RSocket-Java, please see the following examples here. For browser-server communication, there is an RSocket-JS implementation which allows wiring the streaming communication between browser and server through WebSocket.
Nowadays there are a few frameworks, built on top of the RSocket protocol.
One of the frameworks is a Proteus project which offers full-fledged microservices built on top of RSocket. Also, Proteus is well integrated with Spring framework so now we may achieve a fair backpressure control (see examples there)