reactive flatmap to send http requests not scalling with "concurrency" parameter

241 Views Asked by At

What I am trying to achieve:

Send as many http requests as possible, in parallel, to a very reliable third party service from an aggressive Flux

Background:

The third party service is very reliable and can sustain a very high number of requests. So far, I am the only client of this third party service. I am invited to hit the third party server as hard as possible. The third party service, which I have no control over, does not offer any bulk/list API. I can only send requests one by one. On their side, each request takes a constant one second to process.

What did I try:

Here is the configuration of my http client, as well as the logic to send the http requests (hopefully as many requests, as fast as possible)

client

@Bean
public WebClient webClient(WebClient.Builder builder) {
    final var clientConnector = new ClientConnector();
    final var httpClient = new HttpClient(new HttpClientTransportDynamic(clientConnector));

    return builder
            .baseUrl("http://some-host:8080/api/path")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
            .clientConnector(new JettyClientHttpConnector(httpClient))
            .build();
}

the flux:

// some very fast flux, this line is just an example
        Flux<String> inputFlux = Flux.interval(Duration.ofMillis(1)).map(i -> "transform to request payload number " + i);
        //send as many requests as fast as possible, the default flatMap number should be 256
        Flux<String> resultFlux = inputFlux.flatMap(oneInput -> webClient.post().bodyValue(oneInput).retrieve().bodyToMono(String.class));
        //doing something with the result
        return resultFlux.map(oneResult -> doSomething(oneResult));

Using this, I asked the third party service and they gave me a number N, my number of requests per second.

First observation, the default number of concurrency for flatMap should be 256. And since the third party takes one second to process the request, I would have expected a rate N of 256 requests per second.

However, I am nowhere close. The third party service told me I am at 16ish requests per second

Additional try:

As I want to scale the number of requests sent to the third party server, I am tuning the number of concurrencies as follows:

try number 1:

// some very fast flux, this line is just an example
        Flux<String> inputFlux = Flux.interval(Duration.ofMillis(1)).map(i -> "transform to request payload number " + i);
        //BUMP to 2048, 10x increase
        Flux<String> resultFlux = inputFlux.flatMap(oneInput -> webClient.post().bodyValue(oneInput).retrieve().bodyToMono(String.class), 2048);
        //doing something with the result
        return resultFlux.map(oneResult -> doSomething(oneResult));

When tuning the concurrency parameter of flatMap, I would have expected a higher number of requests sent, here like 10*N. But not at all, I am still at 16ish requests per second.

Question:

If flatMap is the best way to achieve that (please let me know if I should change to something else)

If it is, why is it not scaling? Why is it not working?

1

There are 1 best solutions below

0
kimec On

If you need a very fast Flux which generates values infinitely and as quickly as possible, you can just use

Flux<Integer> inputFlux = Mono.just(1).repeat()

If you need the values to increase, you can use

Flux<Integer> inputFlux = Flux.generate(
  () -> 1, (s, sink) -> {
  sink.next(s);
  return s + 1;
})

Both of these will run synchronously producing values as fast as possible. They can be imagined as a tight for(;;) loop, so all the pitfalls that would apply to a tight for loop apply here as well, so be careful.

The most likely reason why you cannot generate more requests to the 3rd party service is that once you flatMap to WebClient, the actual request handling is governed by the scheduling policy of the jetty-reactive-httpclient library. I am just guessing but I assume you will need to tweak the number of concurrent connections, possibly IO threads, keep alive, pipelining, etc. of the HttpClient library (used internally by jetty-reactive-httpclient). Also, the 3rd party service could be using some form of not-so-obvious rate limitation (say not allowing keep-alive, not supporting HTTP pipelining).

So in essence, changing concurrency and prefetch settings of flatMap() operator does not automatically translate to a higher throughput configuration for the Jetty HttpClient library doing the actual IO work. You can read about Jetty's HttpClient here https://www.eclipse.org/jetty/documentation/jetty-11/programming-guide/index.html.

Also, when analyzing, you should observe the traffic on your machine too (how many sockets are opened, amount of traffic originating from your machine, state of the sockets, etc.) because your requests could still be slowed down by some intermediary whether it is some reverse proxy or router, switch or even by OS or antivirus software on your machine.