Why `Flux.generate()` is never requested?

29 Views Asked by At

Consider the following code:

  @Test
  void testTooMuchBuffering2() {
    var counter = new AtomicInteger(0);
    var lastCollected = new AtomicInteger(-1);
    Flux.<List<Integer>>generate(
            sink -> {
              int val = counter.getAndAdd(1000);
              if (val >= 10_000) {
                sink.complete();
                return;
              }
              // Read 1000 rows from a "database"
              System.out.println("\nGenerating " + (val + 1000));
              sink.next(IntStream.range(val, val + 1000).boxed().toList());
            })
        .doOnSubscribe(s -> System.out.println("Subscribed"))
        .doOnRequest(r -> System.out.println("flatMapIterable: " + r)) // Never executes
        .flatMapIterable(Function.identity())
        .doOnRequest(r -> System.out.println("blockLast: " + r))
        .blockLast();
  }

The doOnRequest() above flatMapIterable() never gets executed. The output of this test is:

Subscribed
blockLast: 9223372036854775807
Generating 1000
Generating 2000
Generating 3000
Generating 4000
Generating 5000
Generating 6000
Generating 7000
Generating 8000
Generating 9000
Generating 10000

Why isn't the lambda indoOnRequest() invoked here? In other cases, too much is requested and here nothing is ever requested. Why?

0

There are 0 best solutions below