Hello,大家好,我是樓下小黑哥~
如果給你一個包含一億行數據的超大文件,讓你在一周之內將數據轉化導入生產數據庫,你會如何操作?
上面的問題其實是小黑哥前段時間接到一個真實的業務需求,將一個老系統歷史數據通過線下文件的方式遷移到新的生產系統。
由於老板們已經敲定了新系統上線時間,所以只留給小黑哥一周的時間將歷史數據導入生產系統。
由於時間緊,而數據量又超大,所以小黑哥設計的過程想到一下解決辦法:
- 拆分文件
- 多線程導入
歡迎關注我的公眾號:小黑十一點半,獲得日常干貨推送。如果您對我的專題內容感興趣,也可以關注我的博客:studyidea.cn
拆分文件
首先我們可以寫個小程序,或者使用拆分命令 split
將這個超大文件拆分一個個小文件。
-- 將一個大文件拆分成若干個小文件,每個文件 100000 行
split -l 100000 largeFile.txt -d -a 4 smallFile_
這里之所以選擇先將大文件拆分,主要考慮到兩個原因:
第一如果程序直接讀取這個大文件,假設讀取一半的時候,程序突然宕機,這樣就會直接丟失文件讀取的進度,又需要重新開頭讀取。
而文件拆分之后,一旦小文件讀取結束,我們可以將小文件移動一個指定文件夾。
這樣即使應用程序宕機重啟,我們重新讀取時,只需要讀取剩余的文件。
第二,一個文件,只能被一個應用程序讀取,這樣就限制了導入的速度。
而文件拆分之后,我們可以采用多節點部署的方式,水平擴展。每個節點讀取一部分文件,這樣就可以成倍的加快導入速度。
多線程導入
當我們拆分完文件,接着我們就需要讀取文件內容,進行導入。
之前拆分的時候,設置每個小文件包含 10w 行的數據。由於擔心一下子將 10w 數據讀取應用中,導致堆內存占用過高,引起頻繁的 Full GC,所以下面采用流式讀取的方式,一行一行的讀取數據。
當然了,如果拆分之后文件很小,或者說應用的堆內存設置很大,我們可以直接將文件加載到應用內存中處理。這樣相對來說簡單一點。
逐行讀取的代碼如下:
File file = ...
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
while (iterator.hasNext()) {
String line=iterator.nextLine();
convertToDB(line);
}
}
上面代碼使用 commons-io
中的 LineIterator
類,這個類底層使用了 BufferedReader
讀取文件內容。它將其封裝成迭代器模式,這樣我們可以很方便的迭代讀取。
如果當前使用 JDK1.8 ,那么上述操作更加簡單,我們可以直接使用 JDK 原生的類 Files
將文件轉成 Stream
方式讀取,代碼如下:
Files.lines(Paths.get("文件路徑"), Charset.defaultCharset()).forEach(line -> {
convertToDB(line);
});
其實仔細看下 Files#lines
底層源碼,其實原理跟上面的 LineIterator
類似,同樣也是封裝成迭代器模式。
多線程的引入存在的問題
上述讀取的代碼寫起來不難,但是存在效率問題,主要是因為只有單線程在導入,上一行數據導入完成之后,才能繼續操作下一行。
為了加快導入速度,那我們就多來幾個線程,並發導入。
多線程我們自然將會使用線程池的方式,相關代碼改造如下:
File file = ...;
ExecutorService executorService = new ThreadPoolExecutor(
5,
10,
60,
TimeUnit.MINUTES,
// 文件數量,假設文件包含 10W 行
new ArrayBlockingQueue<>(10*10000),
// guava 提供
new ThreadFactoryBuilder().setNameFormat("test-%d").build());
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
while (iterator.hasNext()) {
String line = iterator.nextLine();
executorService.submit(() -> {
convertToDB(line);
});
}
}
上述代碼中,每讀取到一行內容,就會直接交給線程池來執行。
我們知道線程池原理如下:
- 如果核心線程數未滿,將會直接創建線程執行任務。
- 如果核心線程數已滿,將會把任務放入到隊列中。
- 如果隊列已滿,將會再創建線程執行任務。
- 如果最大線程數已滿,隊列也已滿,那么將會執行拒絕策略。
由於我們上述線程池設置的核心線程數為 5,很快就到達了最大核心線程數,后續任務只能被加入隊列。
為了后續任務不被線程池拒絕,我們可以采用如下方案:
- 將隊列容量設置成很大,包含整個文件所有行數
- 將最大線程數設置成很大,數量大於件所有行數
以上兩種方案都存在同樣的問題,第一種是相當於將文件所有內容加載到內存,將會占用過多內存。
而第二種創建過多的線程,同樣也會占用過多內存。
一旦內存占用過多,GC 無法清理,就可能會引起頻繁的 Full GC,甚至導致 OOM,導致程序導入速度過慢。
解決這個問題,我們可以如下兩種解決方案:
CountDownLatch
批量執行- 擴展線程池
CountDownLatch
批量執行
JDK 提供的 CountDownLatch
,可以讓主線程等待子線程都執行完成之后,再繼續往下執行。
利用這個特性,我們可以改造多線程導入的代碼,主體邏輯如下:
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
// 存儲每個任務執行的行數
List<String> lines = Lists.newArrayList();
// 存儲異步任務
List<ConvertTask> tasks = Lists.newArrayList();
while (iterator.hasNext()) {
String line = iterator.nextLine();
lines.add(line);
// 設置每個線程執行的行數
if (lines.size() == 1000) {
// 新建異步任務,注意這里需要創建一個 List
tasks.add(new ConvertTask(Lists.newArrayList(lines)));
lines.clear();
}
if (tasks.size() == 10) {
asyncBatchExecuteTask(tasks);
}
}
// 文件讀取結束,但是可能還存在未被內容
tasks.add(new ConvertTask(Lists.newArrayList(lines)));
// 最后再執行一次
asyncBatchExecuteTask(tasks);
}
這段代碼中,每個異步任務將會導入 1000 行數據,等積累了 10 個異步任務,然后將會調用 asyncBatchExecuteTask
使用線程池異步執行。
/**
* 批量執行任務
*
* @param tasks
*/
private static void asyncBatchExecuteTask(List<ConvertTask> tasks) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(tasks.size());
for (ConvertTask task : tasks) {
task.setCountDownLatch(countDownLatch);
executorService.submit(task);
}
// 主線程等待異步線程 countDownLatch 執行結束
countDownLatch.await();
// 清空,重新添加任務
tasks.clear();
}
asyncBatchExecuteTask
方法內將會創建 CountDownLatch
,然后主線程內調用 await
方法等待所有異步線程執行結束。
ConvertTask
異步任務邏輯如下:
/**
* 異步任務
* 等數據導入完成之后,一定要調用 countDownLatch.countDown()
* 不然,這個主線程將會被阻塞,
*/
private static class ConvertTask implements Runnable {
private CountDownLatch countDownLatch;
private List<String> lines;
public ConvertTask(List<String> lines) {
this.lines = lines;
}
public void setCountDownLatch(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}
@Override
public void run() {
try {
for (String line : lines) {
convertToDB(line);
}
} finally {
countDownLatch.countDown();
}
}
}
ConvertTask
任務類邏輯就非常簡單,遍歷所有行,將其導入到數據庫中。所有數據導入結束,調用 countDownLatch#countDown
。
一旦所有異步線程執行結束,調用 countDownLatch#countDown
,主線程將會被喚醒,繼續執行文件讀取。
雖然這種方式解決上述問題,但是這種方式,每次都需要積累一定任務數才能開始異步執行所有任務。
另外每次都需要等待所有任務執行結束之后,才能開始下一批任務,批量執行消耗的時間等於最慢的異步任務消耗的時間。
這種方式線程池中線程存在一定的閑置時間,那有沒有辦法一直壓榨線程池,讓它一直在干活呢?
擴展線程池
回到最開始的問題,文件讀取導入,其實就是一個生產者-消費者消費模型。
主線程作為生產者不斷讀取文件,然后將其放置到隊列中。
異步線程作為消費者不斷從隊列中讀取內容,導入到數據庫中。
一旦隊列滿載,生產者應該阻塞,直到消費者消費任務。
其實我們使用線程池的也是一個生產者-消費者消費模型,其也使用阻塞隊列。
那為什么線程池在隊列滿載的時候,不發生阻塞?
這是因為線程池內部使用 offer
方法,這個方法在隊列滿載的時候不會發生阻塞,而是直接返回 。
那我們有沒有辦法在線程池隊列滿載的時候,阻塞主線程添加任務?
其實是可以的,我們自定義線程池拒絕策略,當隊列滿時改為調用 BlockingQueue.put
來實現生產者的阻塞。
RejectedExecutionHandler rejectedExecutionHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
if (!executor.isShutdown()) {
try {
executor.getQueue().put(r);
} catch (InterruptedException e) {
// should not be interrupted
}
}
}
};
這樣一旦線程池滿載,主線程將會被阻塞。
使用這種方式之后,我們可以直接使用上面提到的多線程導入的代碼。
ExecutorService executorService = new ThreadPoolExecutor(
5,
10,
60,
TimeUnit.MINUTES,
new ArrayBlockingQueue<>(100),
new ThreadFactoryBuilder().setNameFormat("test-%d").build(),
(r, executor) -> {
if (!executor.isShutdown()) {
try {
// 主線程將會被阻塞
executor.getQueue().put(r);
} catch (InterruptedException e) {
// should not be interrupted
}
}
});
File file = new File("文件路徑");
try (LineIterator iterator = IOUtils.lineIterator(new FileInputStream(file), "UTF-8")) {
while (iterator.hasNext()) {
String line = iterator.nextLine();
executorService.submit(() -> convertToDB(line));
}
}
小結
一個超大的文件,我們可以采用拆分文件的方式,將其拆分成多份文件,然后部署多個應用程序提高讀取速度。
另外讀取過程我們還可以使用多線程的方式並發導入,不過我們需要注意線程池滿載之后,將會拒絕后續任務。
我們可以通過擴展線程池,自定義拒絕策略,使讀取主線程阻塞。
好了,今天文章內容就到這里,不知道各位有沒有其他更好的解決辦法,歡迎留言討論。
歡迎關注我的公眾號:小黑十一點半,獲得日常干貨推送。如果您對我的專題內容感興趣,也可以關注我的博客:studyidea.cn