I have some imperative code which processes on 20 Threads in parallel.
IntStream.range(0, 20)
.forEach(t -> {
Runnable runnable = () -> {
int records = RANGE;
while (records > 0) {
records = processRecords(t, Thread.currentThread().getId());
}
};
new Thread(runnable, "processor-thread-".concat(String.valueOf(t)))
.start();
});
I tried to transform this to reactive but reactive creates only 10 threads instead of 20. What am I doing wrong? Currently imperative processing is much faster than reactive.
ExecutorService executor = Executors.newFixedThreadPool(THREADS);
IntStream.range(0, THREADS)
.forEach(t -> Multi
.createBy()
.repeating()
.uni(AtomicInteger::new, contacts -> processMessages(t))
.until(List::isEmpty)
.onItem()
.disjoint()
.runSubscriptionOn(executor)
.subscribe()
.with(item -> System.out.println(Thread.currentThread().getName()))
);
I would not structure the program like that. I would represent each "processing" as a Uni and then create a pipeline processing them concurrently.
So, first, let's isolate the processing:
Then, let's build the pipeline:
transformToUni(...).merge(concurrency)is going to subscribe to concurrency (here 20) unis concurrently. Each of them will run that subscription (and the processing) in a thread from the executor.