高效讀取大文件,再也不用擔心 OOM 了!


內存讀取

第一個版本,采用內存讀取的方式,所有的數據首先讀讀取到內存中,程序代碼如下:

Stopwatch stopwatch = Stopwatch.createStarted();
// 將全部行數讀取的內存中
List<String> lines = FileUtils.readLines(new File("temp/test.txt"), Charset.defaultCharset());
for (String line : lines) {
    // pass
}
stopwatch.stop();
System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");
// 計算內存占用
logMemory();

logMemory方法如下:

MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean();
//堆內存使用情況
MemoryUsage memoryUsage = memoryMXBean.getHeapMemoryUsage();
//初始的總內存
long totalMemorySize = memoryUsage.getInit();
//已使用的內存
long usedMemorySize = memoryUsage.getUsed();
 
 
System.out.println("Total Memory: " + totalMemorySize / (1024 * 1024) + " Mb");
System.out.println("Free Memory: " + usedMemorySize / (1024 * 1024) + " Mb");

上述程序中,使用 Apache Common-Io 開源第三方庫,FileUtils#readLines將會把文件中所有內容,全部讀取到內存中。

這個程序簡單測試並沒有什么問題,但是等拿到真正的數據文件,運行程序,很快程序發生了 OOM。

之所以會發生 OOM,主要原因是因為這個數據文件太大。假設上面測試文件 test.txt總共有 200W 行數據,文件大小為:740MB。

通過上述程序讀取到內存之后,在我的電腦上內存占用情況如下:

可以看到一個實際大小為 700 多 M 的文件,讀到內存中占用內存量為 1.5G 之多。而我之前的程序,虛擬機設置內存大小只有 1G,所以程序發生了 OOM。

當然這里最簡單的辦法就是加內存唄,將虛擬機內存設置到 2G,甚至更多。不過機器內存始終有限,如果文件更大,還是沒有辦法全部都加載到內存。

不過仔細一想真的需要將全部數據一次性加載到內存中?

很顯然,不需要!

在上述的場景中,我們將數據到加載內存中,最后不還是一條條處理數據。

所以下面我們將讀取方式修改成逐行讀取。

逐行讀取

逐行讀取的方式比較多,這里主要介紹兩種方式:

  • BufferReader

  • Apache Commons IO

  • Java8 stream

BufferReader

我們可以使用 BufferReader#readLine 逐行讀取數據。

try (BufferedReader fileBufferReader = new BufferedReader(new FileReader("temp/test.txt"))) {
    String fileLineContent;
    while ((fileLineContent = fileBufferReader.readLine()) != null) {
        // process the line.
    }
} catch (FileNotFoundException e) {
    e.printStackTrace();
} catch (IOException e) {
    e.printStackTrace();
}

Apache Commons IO

Common-IO 中有一個方法 FileUtils#lineIterator可以實現逐行讀取方式,使用代碼如下:

Stopwatch stopwatch = Stopwatch.createStarted();
LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name());
while (fileContents.hasNext()) {
    fileContents.nextLine();
    //  pass
}
logMemory();
fileContents.close();
stopwatch.stop();
System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");

這個方法返回一個迭代器,每次我們都可以獲取的一行數據。

其實我們查看代碼,其實可以發現 FileUtils#lineIterator,其實用的就是 BufferReader,感興趣的同學可以自己查看一下源碼。

Java8 stream

Java8 Files 類新增了一個 lines,可以返回 Stream我們可以逐行處理數據。

Stopwatch stopwatch = Stopwatch.createStarted();
// lines(Path path, Charset cs)
try (Stream<String> inputStream = Files.lines(Paths.get("temp/test.txt"), StandardCharsets.UTF_8)) {
    inputStream
            .filter(str -> str.length() > 5)// 過濾數據
            .forEach(o -> {
                // pass do sample logic
            });
}
logMemory();
stopwatch.stop();
System.out.println("read all lines spend " + stopwatch.elapsed(TimeUnit.SECONDS) + " s");

使用這個方法有個好處在於,我們可以方便使用 Stream 鏈式操作,做一些過濾操作。

注意:這里我們使用 try-with-resources 方式,可以安全的確保讀取結束,流可以被安全的關閉。

並發讀取

逐行的讀取的方式,解決我們 OOM 的問題。不過如果數據很多,我們這樣一行行處理,需要花費很多時間。

上述的方式,只有一個線程在處理數據,那其實我們可以多來幾個線程,增加並行度。

下面在上面的基礎上,就拋磚引玉,介紹下自己比較常用兩種並行處理方式。

逐行批次打包

第一種方式,先逐行讀取數據,加載到內存中,等到積累一定數據之后,然后再交給線程池異步處理。

@SneakyThrows
public static void readInApacheIOWithThreadPool() {
    // 創建一個 最大線程數為 10,隊列最大數為 100 的線程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100));
    // 使用 Apache 的方式逐行讀取數據
    LineIterator fileContents = FileUtils.lineIterator(new File("temp/test.txt"), StandardCharsets.UTF_8.name());
    List<String> lines = Lists.newArrayList();
    while (fileContents.hasNext()) {
        String nextLine = fileContents.nextLine();
        lines.add(nextLine);
        // 讀取到十萬的時候
        if (lines.size() == 100000) {
            // 拆分成兩個 50000 ,交給異步線程處理
            List<List<String>> partition = Lists.partition(lines, 50000);
            List<Future> futureList = Lists.newArrayList();
            for (List<String> strings : partition) {
                Future<?> future = threadPoolExecutor.submit(() -> {
                    processTask(strings);
                });
                futureList.add(future);
            }
            // 等待兩個線程將任務執行結束之后,再次讀取數據。這樣的目的防止,任務過多,加載的數據過多,導致 OOM
            for (Future future : futureList) {
                // 等待執行結束
                future.get();
            }
            // 清除內容
            lines.clear();
        }
 
 
    }
    // lines 若還有剩余,繼續執行結束
    if (!lines.isEmpty()) {
        // 繼續執行
        processTask(lines);
    }
  threadPoolExecutor.shutdown();
}
    private static void processTask(List<String> strings) {
        for (String line : strings) {
            // 模擬業務執行
            try {
                TimeUnit.MILLISECONDS.sleep(10L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

上述方法,等到內存的數據到達 10000 的時候,拆封兩個任務交給異步線程執行,每個任務分別處理 50000 行數據。

后續使用  future#get(),等待異步線程執行完成之后,主線程才能繼續讀取數據。

之所以這么做,主要原因是因為,線程池的任務過多,再次導致 OOM 的問題。

大文件拆分成小文件

第二種方式,首先我們將一個大文件拆分成幾個小文件,然后使用多個異步線程分別逐行處理數據。

public static void splitFileAndRead() throws Exception {
    // 先將大文件拆分成小文件
    List<File> fileList = splitLargeFile("temp/test.txt");
    // 創建一個 最大線程數為 10,隊列最大數為 100 的線程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(10, 10, 60l, TimeUnit.SECONDS, new LinkedBlockingDeque<>(100));
    List<Future> futureList = Lists.newArrayList();
    for (File file : fileList) {
        Future<?> future = threadPoolExecutor.submit(() -> {
            try (Stream inputStream = Files.lines(file.toPath(), StandardCharsets.UTF_8)) {
                inputStream.forEach(o -> {
                    // 模擬執行業務
                    try {
                        TimeUnit.MILLISECONDS.sleep(10L);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            } catch (IOException e) {
                e.printStackTrace();
            }
        });
        futureList.add(future);
    }
    for (Future future : futureList) {
        // 等待所有任務執行結束
        future.get();
    }
    threadPoolExecutor.shutdown();
 }
 
 
private static List<File> splitLargeFile(String largeFileName) throws IOException {
    LineIterator fileContents = FileUtils.lineIterator(new File(largeFileName), StandardCharsets.UTF_8.name());
    List<String> lines = Lists.newArrayList();
    // 文件序號
    int num = 1;
    List<File> files = Lists.newArrayList();
    while (fileContents.hasNext()) {
        String nextLine = fileContents.nextLine();
        lines.add(nextLine);
        // 每個文件 10w 行數據
        if (lines.size() == 100000) {
            createSmallFile(lines, num, files);
            num++;
        }
    }
    // lines 若還有剩余,繼續執行結束
    if (!lines.isEmpty()) {
        // 繼續執行
        createSmallFile(lines, num, files);
    }
    return files;
}

上述方法,首先將一個大文件拆分成多個保存 10W 行的數據的小文件,然后再將小文件交給線程池異步處理。

由於這里的異步線程每次都是逐行從小文件的讀取數據,所以這種方式不用像上面方法一樣擔心 OOM 的問題。

另外,上述我們使用 Java 代碼,將大文件拆分成小文件。這里還有一個簡單的辦法,我們可以直接使用下述命令,直接將大文件拆分成小文件:

# 將大文件拆分成 100000 的小文件
split -l 100000 test.txt

后續 Java 代碼只需要直接讀取小文件即可。

總結

當我們從文件讀取數據時,如果文件不是很大,我們可以考慮一次性讀取到內存中,然后快速處理。

如果文件過大,我們就沒辦法一次性加載到內存中,所以我們需要考慮逐行讀取,然后處理數據。但是單線程處理數據畢竟有限,所以我們考慮使用多線程,加快處理數據。

本篇文章我們只是簡單介紹了下,數據從文件讀取幾種方式。數據讀取之后,我們肯定還需要處理,然后最后會存儲到數據庫中或者輸出到另一個文件中。

這個過程,說實話比較麻煩,因為我們的數據源文件,可能是 txt,也可能是 excel,這樣我們就需要增加多種讀取方法。同樣的,當數據處理完成之后,也有同樣的問題。

不過好在,上述的問題我們可以使用 Spring Batch 完美解決。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM