I'm discovering reactor library in Java and I run into ParallelFlux<T> parallel().
I wanted to play around and created app which reads from file and rewrites it's content line by one to other files.
My file file.txt contains 1M lines of book titles.
The app:
package cdq.cdl.filereader;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicReference;
import cdq.cdl.utils.Utils;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;
public class Main
{
public static void main(String[] args)
{
String path = ClassLoader.getSystemResource("file.txt").getPath();
HashMap<String, PrintWriter> printWriters = new HashMap<>();
AtomicReference<Long> startMillis = new AtomicReference<>();
AtomicReference<Long> endMillis = new AtomicReference<>();
Flux.using(
() -> new FileReader(path),
fileReader -> Flux.fromStream(new BufferedReader(fileReader).lines()),
fileReader -> {
try
{
fileReader.close();
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
)
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(line -> {
String threadName = Thread.currentThread().getName();
if (printWriters.containsKey(threadName)) {
printWriters.get(threadName).println(line);
} else {
FileWriter newFileWriter = null;
try
{
newFileWriter = new FileWriter(threadName + ".txt");
}
catch (IOException e)
{
throw new RuntimeException(e);
}
PrintWriter newPrintWriter = new PrintWriter(newFileWriter, true);
newPrintWriter.println(line);
printWriters.put(threadName, newPrintWriter);
}
})
.sequential()
.doFirst(() -> {
printThreadName("doFirst");
startMillis.set(System.currentTimeMillis());
})
.doOnComplete(() -> {
printThreadName("doComplete");
endMillis.set(System.currentTimeMillis());
})
.doFinally(x -> {
printThreadName("doFinally");
printThreadName(String.valueOf(endMillis.get() - startMillis.get()));
})
.subscribe(Main::printThreadName);
}
private static void printThreadName(String message)
{
System.out.println(Thread.currentThread().getName() + "\t\t" + message);
}
}
When I run this without
.parallel()
.runOn(Schedulers.boundedElastic())
it takes around 10000 milliseconds and I would expect that when I would use multiple threads (in my case 8) it would go multiple times faster, but in reality it's even slower. At the beginning I was writing to only one file so I assumed that PrintWriter locks file internally and threads has to wait for each other anyway and that's why I created map of writers, but it didn't help.
Is it possible that I/O operations can't be performed asynchronously and it doesn't matter that I write to multiple files?
This is example of console output on smaller dataset:
main doFirst
parallel-5 For a Breath I Tarry
parallel-5 The Man Within
parallel-6 No Country for Old Men
parallel-6 Behold the Man
parallel-6 Oh! To be in England
parallel-6 The Wind's Twelve Quarters
parallel-6 Moab Is My Washpot
parallel-6 Number the Stars
parallel-6 When the Green Woods Laugh
parallel-6 His Dark Materials
parallel-6 Some Buried Caesar
parallel-6 Ego Dominus Tuus
parallel-6 Fame Is the Spur
parallel-6 I Know Why the Caged Bird Sings
parallel-6 The Golden Apples of the Sun
parallel-3 Postern of Fate
parallel-3 For a Breath I Tarry
parallel-3 No Highway
parallel-3 doComplete
parallel-3 doFinally
parallel-3 13
It does quite often happen that single thread consumes a lot of time in a row, but 1M data I could also find places where it was much more shuffled.