The following Java code reads a large file input.txt and breaks it into chunks and then concurrently reads it. Finally it dumps the contents to output.txt. This is an academic exercise and not on a project. So there would be better approaches. But in the following code, somehow I am not able to figure out why it's getting deadlocked.
Any idea?
package org.sid.misc;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.*;
class FileChunk {
private final int order;
private final byte[] data;
public FileChunk(int order, byte[] data) {
this.order = order;
this.data = data;
}
public int getOrder() {
return order;
}
public byte[] getData() {
return data;
}
}
class FileReaderTask implements Runnable {
private final String inputFilePath;
private final BlockingQueue<FileChunk> queue;
private final int chunkSize;
private final int order;
public FileReaderTask(String inputFilePath, BlockingQueue<FileChunk> queue, int chunkSize, int order) {
this.inputFilePath = inputFilePath;
this.queue = queue;
this.chunkSize = chunkSize;
this.order = order;
}
@Override
public void run() {
try (FileInputStream fis = new FileInputStream(inputFilePath);
FileChannel channel = fis.getChannel()) {
long position = order * chunkSize;
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
if (channel.read(buffer, position) > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
queue.put(new FileChunk(order, data));
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
System.err.println("Error reading file chunk: " + e.getMessage());
}
}
}
class FileWriterTask implements Runnable {
private final String outputFilePath;
private final BlockingQueue<FileChunk> queue;
public FileWriterTask(String outputFilePath, BlockingQueue<FileChunk> queue) {
this.outputFilePath = outputFilePath;
this.queue = queue;
}
@Override
public void run() {
try (FileOutputStream fos = new FileOutputStream(outputFilePath, true)) {
while (true) {
FileChunk chunk = queue.take();
if (chunk.getData().length == 0) {
break; // Signal to stop writing
}
fos.write(chunk.getData());
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
System.err.println("Error writing file chunk: " + e.getMessage());
}
}
}
class Test {
private static final int CHUNK_SIZE = 1024 * 1024; // 1MB chunks
private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors();
public static void main(String[] args) throws InterruptedException {
String inputFilePath = "input.txt";
String outputFilePath = "output.txt";
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
BlockingQueue<FileChunk> queue = new LinkedBlockingQueue<>();
long startTime = System.currentTimeMillis();
// Start the writer task
executor.submit(new FileWriterTask(outputFilePath, queue));
// Start the reader tasks
for (int i = 0; i < NUM_THREADS; i++) {
executor.submit(new FileReaderTask(inputFilePath, queue, CHUNK_SIZE, i));
}
// Wait for all tasks to complete
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
long endTime = System.currentTimeMillis();
System.out.println("Execution time: " + (endTime - startTime) + " ms");
}
}
You are using blocking queue implementation and as mentioned in docs the ‘take()’ will get blocked until an element is available.
The thread which is waiting for the element continues to wait indefinitely and hence executor won’t be able to shutdown.