What I am trying to achieve:
Send as many http requests as possible, in parallel, to a very reliable third party service from an aggressive Flux
Background:
The third party service is very reliable and can sustain a very high number of requests. So far, I am the only client of this third party service. I am invited to hit the third party server as hard as possible. The third party service, which I have no control over, does not offer any bulk/list API. I can only send requests one by one. On their side, each request takes a constant one second to process.
What did I try:
Here is the configuration of my http client, as well as the logic to send the http requests (hopefully as many requests, as fast as possible)
client
@Bean
public WebClient webClient(WebClient.Builder builder) {
final var clientConnector = new ClientConnector();
final var httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector));
return builder
.baseUrl("http://some-host:8080/api/path")
.defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.clientConnector(new JettyClientHttpConnector(httpClient))
.build();
}
the flux:
// some very fast flux, this line is just an example
Flux<String> inputFlux = Flux.interval(Duration.ofMillis(1)).map(i -> "transform to request payload number " + i);
//send as many requests as fast as possible, the default flatMap number should be 256
Flux<String> resultFlux = inputFlux.flatMap(oneInput -> webClient.post().bodyValue(oneInput).retrieve().bodyToMono(String.class));
//doing something with the result
return resultFlux.map(oneResult -> doSomething(oneResult));
Using this, I asked the third party service and they gave me a number N, my number of requests per second.
First observation, the default number of concurrency for flatMap should be 256. And since the third party takes one second to process the request, I would have expected a rate N of 256 requests per second.
However, I am nowhere close. The third party service told me I am at 16ish requests per second
Additional try:
As I want to scale the number of requests sent to the third party server, I am tuning the number of concurrencies as follows:
try number 1:
// some very fast flux, this line is just an example
Flux<String> inputFlux = Flux.interval(Duration.ofMillis(1)).map(i -> "transform to request payload number " + i);
//BUMP to 2048, 10x increase
Flux<String> resultFlux = inputFlux.flatMap(oneInput -> webClient.post().bodyValue(oneInput).retrieve().bodyToMono(String.class), 2048);
//doing something with the result
return resultFlux.map(oneResult -> doSomething(oneResult));
When tuning the concurrency parameter of flatMap, I would have expected a higher number of requests sent, here like 10*N. But not at all, I am still at 16ish requests per second.
Question:
If flatMap is the best way to achieve that (please let me know if I should change to something else)
If it is, why is it not scaling? Why is it not working?
If you need a very fast
Fluxwhich generates values infinitely and as quickly as possible, you can just useIf you need the values to increase, you can use
Both of these will run synchronously producing values as fast as possible. They can be imagined as a tight
for(;;)loop, so all the pitfalls that would apply to a tightforloop apply here as well, so be careful.The most likely reason why you cannot generate more requests to the 3rd party service is that once you
flatMaptoWebClient, the actual request handling is governed by the scheduling policy of thejetty-reactive-httpclientlibrary. I am just guessing but I assume you will need to tweak the number of concurrent connections, possibly IO threads, keep alive, pipelining, etc. of theHttpClientlibrary (used internally byjetty-reactive-httpclient). Also, the 3rd party service could be using some form of not-so-obvious rate limitation (say not allowing keep-alive, not supporting HTTP pipelining).So in essence, changing concurrency and prefetch settings of
flatMap()operator does not automatically translate to a higher throughput configuration for the JettyHttpClientlibrary doing the actual IO work. You can read about Jetty'sHttpClienthere https://www.eclipse.org/jetty/documentation/jetty-11/programming-guide/index.html.Also, when analyzing, you should observe the traffic on your machine too (how many sockets are opened, amount of traffic originating from your machine, state of the sockets, etc.) because your requests could still be slowed down by some intermediary whether it is some reverse proxy or router, switch or even by OS or antivirus software on your machine.