For a homework assignment, I need to implement external sorting such that I can sort a 10GB file with 1GB physical memory. Currently, I'm using a BufferedReader on the large file and constructing/sorting the smaller files sequentially. Then in the merge step, I have BufferedReaders open for all small files and a single BufferedWriter for the large final file where I write to the large file using the merge k sorted lists algorithm with a PriorityQueue. This works, but it needs to be faster (take half as much time to be exact).
The entire splitting step happens sequentially and the entire merging step also happens sequentially. I think I can at least split and sort the files in parallel using multiple threads with different virtual memory spaces. Then the memory used is mostly memory-mapped files and the OS will take care of optimally paging in and out data from physical memory. I was wondering if there was a way for Java to do this using parallel streams. Something along the lines of:
largeFile.splitInParallel(100000)
.lines()
.map((s) -> new LineObject(s))
.sorted()
.forEach(writeSmallFileToDisk)
where the argument to splitInParallel is the number of lines I want in the smaller files. Any help is appreciated, thanks!
EDIT: My code is
public class Main {
private static final int BUFFER_SIZE = 10_000_000;
/**
* A main method to run examples.
*
* @param args not used
*/
public static void main(String[] args) throws IOException {
System.out.println("Starting...");
String file = args[0];
int batchSize = Integer.parseInt(args[1]);;
try {
FileInputStream fin = new FileInputStream(file);
BufferedInputStream bis = new BufferedInputStream(fin, BUFFER_SIZE);
BufferedReader br = new BufferedReader(new InputStreamReader(bis), BUFFER_SIZE);
int lineNumber = 0;
int batchId = 0;
String line;
TaxiEntry[] batch = new TaxiEntry[batchSize];
int i = 0;
while ((line = br.readLine()) != null) {
TaxiEntry taxiEntry = parseLine(line);
batch[i++] = taxiEntry;
lineNumber++;
if (lineNumber % batchSize == 0) {
String outputFileName = String.format("batches/batch_%d.txt", batchId);
BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);
Arrays.parallelSort(batch);
for (int j = 0; j < i; j++) {
bf.write(batch[j].toString());
if (j != i) {
bf.newLine();
}
}
batchId++;
i = 0;
bf.flush();
}
}
String outputFileName = String.format("batches/batch_%d.txt", batchId);
BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);
Arrays.parallelSort(batch, 0, i);
for (int j = 0; j < i; j++) {
bf.write(batch[j].toString());
if (j != i) {
bf.newLine();
}
}
batchId++;
bf.flush();
System.out.println("Processed " + lineNumber + " lines");
merge(batchId);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void merge(int numBatches) throws IOException {
System.out.println("Starting merge...");
// Open readers
BufferedReader[] readers = new BufferedReader[numBatches];
for (int i = 0; i < numBatches; i++) {
String file = String.format("batches/batch_%d.txt", i);
FileInputStream fin = new FileInputStream(file);
BufferedInputStream bis = new BufferedInputStream(fin, BUFFER_SIZE);
BufferedReader br = new BufferedReader(new InputStreamReader(bis), BUFFER_SIZE);
readers[i] = br;
}
// Merge
String outputFileName = "result/final.txt";
BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);
PriorityQueue<IndexedTaxiNode> curEntries = new PriorityQueue<>();
for (int i = 0; i < numBatches; i++) {
BufferedReader reader = readers[i];
String next = reader.readLine();
if (next != null) {
TaxiEntry curr = parseLine(next);
curEntries.add(new IndexedTaxiNode(curr, i));
}
}
while (!curEntries.isEmpty()) {
// get max from curEntries
IndexedTaxiNode maxNode = curEntries.remove();
bf.write(maxNode.toString());
bf.newLine();
int index = maxNode.index;
String next = readers[index].readLine();
if (next != null) {
TaxiEntry newEntry = parseLine(next);
curEntries.add(new IndexedTaxiNode(newEntry, index));
}
}
bf.flush();
}
public static TaxiEntry parseLine(String line) {
return new TaxiEntry(line, Double.parseDouble(line.split(",")[16]));
}
}
Doing some timings. I found that the time to read from disk and the time to do a sort are similar order of magnitude.
Console output is:
I used a modest file of about 150 MB for the timings. It might not be a good idea to have lots of threads all reading from disk at the same time.
My suggestion for what it's worth is to have one thread that does all of the disk reading, and another thread that concurrently does sorting. I could only see a way to do this for the splitting and sorting phase.
For the splitting phase, you cannot read all the segments in one go because that would consume too much memory. So you read a few segments, write a few, read a few, and so on. The idea of this interleaving, is to ensure the disk is continuously kept busy, by delegating the sorting operation to another thread. Hopefully by the time the disk is ready to write a segment the sort on that segment has completed so the disk never has to wait.
An outline for the splitting and sorting phase is shown above, without covering any edge cases. The amount of memory used would be proportional to
BATCH_SIZE * MAX_MESSAGE_QUEUE.Unfortunately, I don't see a way to apply concurrency to the phase of merging the multiple files. The disk is just the disk so cannot go any faster even with multiple threads.
You could try investigating parallel quicksort, but the problem with quicksort is choosing a pivot point so that the partitions end up a reasonable size.