How to process flux in batches and have each batch processed in parallel

48 Views Asked by At

There are 5,000,000 entities in my database. I am connecting to the database via a reactive driver (r2dbc). Next, I want to split it on 100,000 entities, split them into bundles of 1,000 entities and send each bundle further. When a portion of 100,000 is processed, get another 100,000. If I get more than 100,000, it will be OOM

BIG_DATA -get 100.000-> make Batch -1.000(parallel)-> process. If success - get more another

but when I try to simulate it, all the original data is extracted and only then it is divided into bundles

public static void main(String[] args) throws InterruptedException {
        Flux.range(0, 12)
                .limitRate(6)
                .doOnNext(in -> log.info("NEW"))
                .parallel(2)
                .runOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(2)))
                .collect(() -> new ArrayList<Integer>(), List::add)
                .flatMap(DataMigrationServiceApplication::check)
                .subscribe();

        Thread.sleep(10000);
    }

    private static Mono<List<Integer>> check(List<Integer> input) {
        return Mono.defer(() -> Mono.just(input))
                .doOnNext(in -> log.info("IN {}", in))
                .subscribeOn(schedulers.parallel());
    }

log

16:53:57.871 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.874 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO .DataMigrationServiceApplication -- NEW
16:53:57.875 [main] INFO  DataMigrationServiceApplication -- NEW
16:53:57.881 [pool-1-thread-2] INFO DataMigrationServiceApplication -- IN [1, 3, 5, 7, 9, 11]
16:53:57.881 [pool-1-thread-1] INFO DataMigrationServiceApplication -- IN [0, 2, 4, 6, 8, 10]

I try limitRate, buffer, window

Please, help

2

There are 2 best solutions below

0
Влад Савостиков On

Well, I make this:

generator.window(20).flatMap(
         inFlux -> inFlux.parallel(4).runOn(/*scheduler*/).collect(
              () -> new ArrayList<Integer>(), List::add).flatMap(/*process*/))
                .subscribe();

It makes Flux with 20 elements, then 4-rails (in parallel) take elements by round-robin. Each rail take 5 elements and process it.

0
Patrick Hooijer On

There are multiple Reactor operators in your chain that by default request a much larger number of items than they need. This is to make the chain more efficient by using a small buffer (by default reactor.bufferSize.small=256), by minimizing the time waiting for new items. collect() requests even more items, as it needs all items before it can emit its finished collection.

You can show these request signals by using the log() operator:

        Flux.range(0, 12)
                .log("Step A")
                .limitRate(6)
                .log("Step B")
                .parallel(2)
                .runOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(2)))
                .log("Step C")
                .collect(() -> new ArrayList<Integer>(), List::add)
                .log("Step D")
                .flatMap(DataMigrationServiceApplication::check)
                .log("Step E")
                .subscribe();
2024-03-03 12:38:36.728 INFO  [main] Step E - request(unbounded)
2024-03-03 12:38:36.728 INFO  [main] Step D - | request(unbounded)
2024-03-03 12:38:36.729 INFO  [main] Step C - request(unbounded)
2024-03-03 12:38:36.731 INFO  [main] Step E - request(unbounded)
2024-03-03 12:38:36.731 INFO  [main] Step D - | request(unbounded)
2024-03-03 12:38:36.731 INFO  [main] Step C - request(unbounded)
2024-03-03 12:38:36.731 INFO  [main] Step B - | request(256)
2024-03-03 12:38:36.731 INFO  [main] Step A - | request(6)
2024-03-03 12:38:36.734 INFO  [main] Step A - | request(5)
2024-03-03 12:38:36.734 INFO  [main] Step A - | request(5)
2024-03-03 12:38:37.751 INFO  [parallel-2] - IN [0, 2, 4, 6, 8, 10]
2024-03-03 12:38:37.751 INFO  [parallel-1] - IN [1, 3, 5, 7, 9, 11]

You see steps C, D and E twice because of the 2 parallel tracks (I'm ignoring all other signals for now).

In this case, subscribe() requests an unbounded amount of items (step E), which is first limited by the internal buffer of parallel() (step B). limitRate() does reduce it to 6 (step A), but almost immediately needs to request more to satisfy the demand.

The solution is to limit the internal buffers of each of the operators. parallel(), ParallelFlux.flatmap() and ParallelFlux.runOn() all have versions where you can specify the prefetch amount. By setting the prefetch amounts to 1, all these operators will do the minimal buffering. We will also replace collect() with buffer(int), as that limits requests enough to satisfy its demand:

    public static void main(String[] args) throws InterruptedException {
        int batchSize = 6;
        int bundleSize = 2;
        int parallelism = 2;

        Flux.range(0, 12)
                .log("Step A")
                .limitRate(batchSize)
                .log("Step B")
                .buffer(bundleSize)
                .log("Step C")
                .parallel(parallelism, 1)
                .runOn(Schedulers.fromExecutor(Executors.newFixedThreadPool(parallelism)), 1)
                .log("Step D")
                .flatMap(DataMigrationServiceApplication::check, false, 1, 1)
                .subscribe();
        Thread.sleep(10000);
    }

    private static Mono<List<Integer>> check(List<Integer> input) {
        return Mono.just(input)
                .delayElement(Duration.ofMillis(1000)) // to simulate a slow operation
                .doOnNext(in -> log.info("IN {}", in));
    }
2024-03-03 12:40:31.084 INFO Step D - request(1)
2024-03-03 12:40:31.087 INFO Step D - request(1)
2024-03-03 12:40:31.087 INFO Step C - request(1)
2024-03-03 12:40:31.087 INFO Step B - | request(2)
2024-03-03 12:40:31.088 INFO Step A - | request(6)
2024-03-03 12:40:31.089 INFO Step C - request(1)
2024-03-03 12:40:31.089 INFO Step B - | request(2)
2024-03-03 12:40:31.090 INFO Step C - request(1)
2024-03-03 12:40:31.090 INFO Step B - | request(2)
2024-03-03 12:40:31.090 INFO Step A - | request(5)
2024-03-03 12:40:31.090 INFO Step D - onNext([2, 3])
2024-03-03 12:40:31.095 INFO Step C - request(1)
2024-03-03 12:40:31.095 INFO Step B - | request(2)
2024-03-03 12:40:31.098 INFO Step C - request(1)
2024-03-03 12:40:31.099 INFO Step B - | request(2)
2024-03-03 12:40:31.099 INFO Step A - | request(5)
2024-03-03 12:40:32.109 INFO DataMigrationServiceApplication - IN [2, 3]
2024-03-03 12:40:32.111 INFO Step D - request(1)
2024-03-03 12:40:32.111 INFO Step C - request(1)
2024-03-03 12:40:32.111 INFO Step B - | request(2)
2024-03-03 12:40:32.111 INFO DataMigrationServiceApplication - IN [0, 1]
2024-03-03 12:40:32.113 INFO Step D - request(1)
2024-03-03 12:40:32.113 INFO Step C - request(1)
2024-03-03 12:40:32.113 INFO Step B - | request(2)
2024-03-03 12:40:33.123 INFO DataMigrationServiceApplication - IN [6, 7]
2024-03-03 12:40:33.123 INFO DataMigrationServiceApplication - IN [4, 5]
2024-03-03 12:40:33.124 INFO Step D - request(1)
2024-03-03 12:40:33.124 INFO Step D - request(1)
2024-03-03 12:40:34.129 INFO DataMigrationServiceApplication - IN [10, 11]
2024-03-03 12:40:34.129 INFO DataMigrationServiceApplication - IN [8, 9]

Step A still requests all 12 items because that's not enough to fill all buffers in the following steps. But for larger data sizes step A will request items only when subsequent buffers are being emptied.