Why is Flux Reactor buffering so many elements?

49 Views Asked by At

I have a pipeline which, at high level does:

Flux.generate()                           // A. item: a list of size 1000
    .flatMapIterable(Function.identity()) // B. item: one Java record (x 1000)
    .filter()                             // same
    .flatMapSequential()                  // C. item: up to one Java record per input (slow)
    // more stuff

The flatMapSequential() is slow, as it invokes a slow operation (working as intended; I am not looking to speed it up).

What happens is that Reactor reads in very quickly 50,000 (B) items, seemingly without ever thinking it is too much, then slowly taking it through the flatMapSequential() step. The latter is at the rate of maybe 20/sec at best. When the source completes, the buffer Reactor has accumulated eventually empties out after many minutes. It seems it would have buffered even more if the source was large enough.

If I add doOnRequest(r -> log.info("r:{}", r)), I see that the flatMapSequential() is requesting 1 every time it completes one and flatMapIterable() requests 1 at a lower pace, but too fast. I tried also adding limitRate() in a few places, but it does not seem to help at all. It does change the requested amount, but the overall behavior of reading too much does not change.

Is there a way to make it buffer less and make better use of RAM for what it actually needs? For example, in (A) it could read 2 lists, total of 2,000 (B) items for 50 sec worth of flatMapSequential(), then read another (A) list, etc. With this scheme, it needs to cache at most only 2,000 items (instead of the whole source, ~50,000, what it does now).

0

There are 0 best solutions below