Split Large File Into Smaller Files Using Parallel Stream in Java

904 Views Asked by At

For a homework assignment, I need to implement external sorting such that I can sort a 10GB file with 1GB physical memory. Currently, I'm using a BufferedReader on the large file and constructing/sorting the smaller files sequentially. Then in the merge step, I have BufferedReaders open for all small files and a single BufferedWriter for the large final file where I write to the large file using the merge k sorted lists algorithm with a PriorityQueue. This works, but it needs to be faster (take half as much time to be exact).

The entire splitting step happens sequentially and the entire merging step also happens sequentially. I think I can at least split and sort the files in parallel using multiple threads with different virtual memory spaces. Then the memory used is mostly memory-mapped files and the OS will take care of optimally paging in and out data from physical memory. I was wondering if there was a way for Java to do this using parallel streams. Something along the lines of:

largeFile.splitInParallel(100000)
.lines()
.map((s) -> new LineObject(s))
.sorted()
.forEach(writeSmallFileToDisk)

where the argument to splitInParallel is the number of lines I want in the smaller files. Any help is appreciated, thanks!

EDIT: My code is

public class Main {

    private static final int BUFFER_SIZE = 10_000_000;

    /**
     * A main method to run examples.
     *
     * @param args not used
     */
    public static void main(String[] args) throws IOException {
        System.out.println("Starting...");
        String file = args[0];
        int batchSize = Integer.parseInt(args[1]);;

        try {
            FileInputStream fin = new FileInputStream(file);
            BufferedInputStream bis = new BufferedInputStream(fin, BUFFER_SIZE);
            BufferedReader br = new BufferedReader(new InputStreamReader(bis), BUFFER_SIZE);

            int lineNumber = 0;
            int batchId = 0;
            String line;
            TaxiEntry[] batch = new TaxiEntry[batchSize];
            int i = 0;
            while ((line = br.readLine()) != null) {
                TaxiEntry taxiEntry = parseLine(line);
                batch[i++] = taxiEntry;
                lineNumber++;
                if (lineNumber % batchSize == 0) {
                    String outputFileName = String.format("batches/batch_%d.txt", batchId);
                    BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);
                    Arrays.parallelSort(batch);
                    for (int j = 0; j < i; j++) {
                        bf.write(batch[j].toString());
                        if (j != i) {
                            bf.newLine();
                        }
                    }
                    batchId++;
                    i = 0;
                    bf.flush();
                }
            }

            String outputFileName = String.format("batches/batch_%d.txt", batchId);
            BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);
            Arrays.parallelSort(batch, 0, i);
            for (int j = 0; j < i; j++) {
                bf.write(batch[j].toString());
                if (j != i) {
                    bf.newLine();
                }
            }
            batchId++;
            bf.flush();

            System.out.println("Processed " + lineNumber + " lines");
            merge(batchId);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void merge(int numBatches) throws IOException {
        System.out.println("Starting merge...");
        // Open readers
        BufferedReader[] readers = new BufferedReader[numBatches];
        for (int i = 0; i < numBatches; i++) {
            String file = String.format("batches/batch_%d.txt", i);
            FileInputStream fin = new FileInputStream(file);
            BufferedInputStream bis = new BufferedInputStream(fin, BUFFER_SIZE);
            BufferedReader br = new BufferedReader(new InputStreamReader(bis), BUFFER_SIZE);
            readers[i] = br;
        }
        
        // Merge
        String outputFileName = "result/final.txt";
        BufferedWriter bf = new BufferedWriter(new FileWriter(outputFileName, true), BUFFER_SIZE);

        PriorityQueue<IndexedTaxiNode> curEntries = new PriorityQueue<>();
        for (int i = 0; i < numBatches; i++) {
            BufferedReader reader = readers[i];
            String next = reader.readLine();
            if (next != null) {
                TaxiEntry curr = parseLine(next);
                curEntries.add(new IndexedTaxiNode(curr, i));
            }
        }

        while (!curEntries.isEmpty()) {
            // get max from curEntries
            IndexedTaxiNode maxNode = curEntries.remove();
            bf.write(maxNode.toString());
            bf.newLine();

            int index = maxNode.index;
            String next = readers[index].readLine();
            if (next != null) {
                TaxiEntry newEntry = parseLine(next);
                curEntries.add(new IndexedTaxiNode(newEntry, index));
            }
        }

        bf.flush();
    }

    public static TaxiEntry parseLine(String line) {
        return new TaxiEntry(line, Double.parseDouble(line.split(",")[16]));
    }

}
1

There are 1 best solutions below

3
mpette On

Doing some timings. I found that the time to read from disk and the time to do a sort are similar order of magnitude.

System.out.println("Begin loading file");
// do loading stuff
System.out.format("elapsed %.03f ms%n%n", (finishTime - startTime) / 1e6);

System.out.println("Sorting lines");
// do sorting stuff
System.out.format("elapsed %.03f ms%n", (finishTime - startTime) / 1e6);

Console output is:

Begin loading file
elapsed 918.933 ms

Sorting lines
elapsed 1360.896 ms

I used a modest file of about 150 MB for the timings. It might not be a good idea to have lots of threads all reading from disk at the same time.

My suggestion for what it's worth is to have one thread that does all of the disk reading, and another thread that concurrently does sorting. I could only see a way to do this for the splitting and sorting phase.

Separate threads for disk and sort

For the splitting phase, you cannot read all the segments in one go because that would consume too much memory. So you read a few segments, write a few, read a few, and so on. The idea of this interleaving, is to ensure the disk is continuously kept busy, by delegating the sorting operation to another thread. Hopefully by the time the disk is ready to write a segment the sort on that segment has completed so the disk never has to wait.

List<String> lines = new ArrayList<>();
int i = 0;
while (someCondition()) {
    String line = reader.readLine();
    lines.add(line);
    if (lines.size() == BATCH_SIZE) {
        sendMsgToWorker(lines); // send to worker thread
        if (i == MAX_MESSAGE_QUEUE - 1) {
            for (int j = 0; j < MAX_MESSAGE_QUEUE; j++) {
                List<String> sortedLines = waitForLineFromWorker(); // wait for worker thread
                writeTmpFile(sortedLines);
            }
        }
        lines = new ArrayList<>();
        i = (i + 1) % MAX_MESSAGE_QUEUE;
    }
}

An outline for the splitting and sorting phase is shown above, without covering any edge cases. The amount of memory used would be proportional to BATCH_SIZE * MAX_MESSAGE_QUEUE.

Unfortunately, I don't see a way to apply concurrency to the phase of merging the multiple files. The disk is just the disk so cannot go any faster even with multiple threads.

You could try investigating parallel quicksort, but the problem with quicksort is choosing a pivot point so that the partitions end up a reasonable size.