Profile Photo

Process large file using java

Created on: Jan 30, 2025

BufferedReader can be used to read the file line by line to prevent memory overload. Each line is processed, transformed and result is written to another file or db individually in real time.

We will be using multithreading using executor framework so that we can process multiple lines parallel. In below program we have used LinkedBlockingQueue because it is thread safe. Below is program.

import java.io.BufferedReader; import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; class MultiThreadedCSVProcessor { private static final int NUM_WORKERS = Runtime.getRuntime().availableProcessors(); // Number of worker threads private static final int QUEUE_CAPACITY = 10000; // Adjust based on memory availability public static void main(String[] args) { String inputFile = "large_file.csv"; // Path to 100GB CSV String outputFile = "processed_output.csv"; // Output file BlockingQueue<String> queue = new LinkedBlockingQueue<>(QUEUE_CAPACITY); ExecutorService executor = Executors.newFixedThreadPool(NUM_WORKERS); try (BufferedReader reader = new BufferedReader(new FileReader(inputFile)); FileWriter writer = new FileWriter(outputFile)) { // Start worker threads for (int i = 0; i < NUM_WORKERS; i++) { executor.execute(new Worker(queue, writer)); } // Read file line by line and add to queue String line; while ((line = reader.readLine()) != null) { queue.put(line); // Blocks if queue is full } // Add poison pills to signal workers to stop for (int i = 0; i < NUM_WORKERS; i++) { queue.put("EOF"); } } catch (IOException | InterruptedException e) { e.printStackTrace(); } finally { executor.shutdown(); } } // Worker threads to process lines static class Worker implements Runnable { private final BlockingQueue<String> queue; private final FileWriter writer; public Worker(BlockingQueue<String> queue, FileWriter writer) { this.queue = queue; this.writer = writer; } @Override public void run() { try { while (true) { String line = queue.take(); if ("EOF".equals(line)) break; // Stop when EOF is received // Process line String processedLine = processLine(line); synchronized (writer) { // Synchronize writing to file writer.write(processedLine + "\n"); } } } catch (InterruptedException | IOException e) { Thread.currentThread().interrupt(); } } private String processLine(String line) { return line.toUpperCase(); // Example transformation (convert to uppercase) } } }