在很多應用場景中,我們都會面臨着排序需求,可以說是見怪不怪。我們也看過許多的排序算法:從最簡單的冒泡排序、選擇排序,到稍微好點的插入排序、希爾排序,再到有點理論的堆排序、快速排序,再到高級的歸並排序、桶排序、基數排序。
而實際工作中我們可能用到的排序有哪些呢?而且,大部分時序,相信大家都是使用一個現有庫API直接就完成了排序功能。所以,講真,大家還真不一定會很好排序。
不過本文的目的不是基礎排序算法,而是如何處理數據量的文件的內容排序問題?
1. 多大的文件算大文件?
多大的文件算大文件?這是個定義的問題,當我每天處理的都是幾百幾千的數據,那么我遇到幾萬的數據后,我可以認為這個數據量是大的。
但總體來說,我們還是需要定義一個量級的,不然無法評估處理能力問題。
比如我們定義超過200M的文件算大文件可以不?我們定義超過5000w行的數據算大文件可以不?
好了,基於這樣大的數據量的時候,也許我們就不能簡單的調用幾個庫函數就解決問題了,至少你load到內存也將存在一定的風險了。
所以,是時候想想優化的事了!
2. 如何利用好現有的工具?
針對一些問題,我們可以自己做,也可以找別人幫忙做。具體誰來做,這是個問題!
比如,你自己花個一兩天的時間,寫了個排序算法,搞定了。但是,你能保證你的穩定性嗎?你能經受住生產環境復雜的環境考驗嗎?
再比如,你可以現有的工具進行操作,如果有人提供了穩定的api函數供調用的話,你可以這么干。如果你的服務是跑在linux環境下,那么,我們有必要試一下系統提供的排序功能。 sort . 這工具絕對是經過無數的考驗的,可以放心使用。它也有豐富的參數供選擇,這對我們的日常工作非常有幫助,但對於一個普通的排序也許我們並不需要。
比如最簡單的,自然排序:
sort 1-merged.txt -o 1-sorted.txt
就可以將文件排好序了。但是當數據非常大的時候,比如我使用 7000w+ 的行數(約1.2G)進行排序時,就花費了 6min+ . 也許是我硬件不怎么好,但是實際上它就是會很慢啊!
$ time sort 1-merged.txt -o 1-sorted.txt real 8m5.709s user 25m19.652s sys 0m4.523s
這種性能,在當今大數據橫行的時代,基本就是胎死腹中了。大數據應對的都是TB/PB 級別的數量,而我們僅為GB級並且沒有做其他業務就已經耗費了這么長時間,這是沒辦法繼續了。讓我進一步深入。
看到文檔里有說,系統本地化配置影響排序,實際就是存在一個編解碼的問題,它會依據本地的配置來進行轉換字符然后再進行排序。這個開銷可是不小哦,比如我們設置都是中文環境。而要去除這個影響,則可以使用添加 LC_ALL=C 之后就會使用原始的值進行排序,具體影響就是省去轉換編碼的開銷了。那么,我們用這個參數試試。
*** WARNING *** The locale specified by the environment affects sort order. Set LC_ALL=C to get the traditional sort order that uses native byte values. $ time LC_ALL=C sort 1-merged.txt -o 1-sorted.txt real 2m52.663s user 2m7.577s sys 0m5.146s
哇,從8分鍾降到了3分鍾,雖然不是數量級的提升,但至少下降了一半以上的時間消耗,還是非常可觀的。到這個地步,也許能滿足我們的場景了。
但是,請注意一個問題,這里的 LC_ALL=C 之后,就會使用默認的邏輯進行處理了,那么,會有什么影響呢?實際上就是一些本地相關的東西,就會失效了。
最直接的,就是中文處理的問題了,比如我有一個文件內容是這樣的:
床前明月光,
疑是地上霜。
舉頭望明月,
低頭思故鄉。
天子呼來不上船,
自稱臣是酒中仙。
紅酥手,
黃藤酒,
滿城春色宮牆柳。
那么,我們使用 LC_ALL=C 設置來排序后,將會得到如下結果:
$ LC_ALL=C sort 1.txt -o 1-s1.txt $ cat 1-s1.txt 舉頭望明月, 低頭思故鄉。 天子呼來不上船, 滿城春色宮牆柳。 疑是地上霜。 紅酥手, 自稱臣是酒中仙。 黃藤酒, 床前明月光,
額,看不懂啥意思?中文咋排序的我也給整忘了(而且各自機器上得到的結果也可能不一樣)。好吧,沒關系,我們去掉 LC_ALL=C 來看看結果:
$ sort 1.txt -o 1-s1.txt $ cat 1-s1.txt 床前明月光, 低頭思故鄉。 紅酥手, 黃藤酒, 舉頭望山月, 滿城春色宮牆柳。 天子呼來不上船, 疑是地上霜。 自稱臣是酒中仙。
這下看懂了吧,這是按照拼音順序來排序的。所以,你說 LC_ALL=C 重不重要,如果沒有本地化設置,很多東西都是不符合情理的。所以,有時候我們還真不能這么干咯。
如果真想這么干,除非你確認你的文件里只有英文字符符號和數字,或者是 ASCII 的127 個字符。
3. 繞個路高級一下
前面的方法,不是不能解決問題,而是不能解決所有問題。所以,我們還得繼續想辦法。想想當下對大文件的處理方式都有哪些?實際也不多,並行計算是根本,但我們也許做不了並行計算,但我們可以拆分文件嘛。一個文件太大,我們就文件拆小排序后再合並嘛!就是不知道性能如何?
split -l 100000 -d ../1-merged.txt -a 4 sp_; for file in sp_*.txt; do; sort -o $file sorted_$file; done; sort -m sp_*.txt -o targed.txt; # 一行化后的格式 $ time for file in sp_*; do sort -o sorted_$file $file; done; sort -m sorted_* -o targetd.txt; real 12m15.256s user 10m11.465s sys 0m18.623s # 以上時間僅是單個文件的排序時間還不算歸並的時間,下面這個代碼可以統一計算 $ time `for file in sp_1_*; do sort $file -o sorted_$file; done; sort -m sorted_* -o targetd.txt;` real 14m27.643s user 11m13.982s sys 0m22.636s
看起來切分小文件后,排序太耗時間了,看看能不能用多進程輔助下!(所以最終我們還是回到了並行計算的問題上了)
# shell 異步運行就是在其后面添加 & 就可以了, 但是最后的歸並是同步的. $ time `split -l 100000 -d ../1-merged.txt -a 4 sp_ ; for file in sp_* ; do {sort $file -o $file} &; done; wait; sort -m sp_* -o target.txt ; ` # 多處計時監控 $ time `time split -l 100000 -d ../1-merged.txt -a 4 sp_; time for file in sp_1_*; do { sort $file -o $file } & ; done; time wait; time sort -m sp_* -o target.txt;` # 以上報錯,因為命令行下不允許使用 & 操作, 只能自己寫shell腳本,然后運行了 # sort_merge.sh time split -l 100000 -d ../1-merged.txt -a 4 sp_; i=0 for file in sp_*; do { #echo "sort -o $file $file"; sort -o $file $file; } & done; time wait; time sort -m sp_* -o target.txt; # 以上腳本的確是會以異步進行排序,但會開啟非常多的進程,從而導致進程調度繁忙,機器假死 # 需要修復下 # sort_merge.sh split_file_prefix='sp_' rm -rf ${split_file_prefix}*; time split -l 1000000 -d ../short-target.csv -a 4 ${split_file_prefix}; i=0 for file in ${split_file_prefix}*; do { sort -o $file $file; } & # 每開5個進程,就等一下 (( i=$i + 1 )) b=$(( $i % 5 )) if [ $b = 0 ] ; then # 小優化: 只要上一個進程退出就繼續,而不是等到所有進程退出再繼續 time wait $! # time wait fi; done; time wait; time sort -m ${split_file_prefix}* -o target.txt; # 以上運行下來,耗時9min+, 比未優化時還要差, 尷尬! real 9m54.076s user 19m1.480s sys 0m36.016s
看起來沒啥優勢啊, 咋整? 咱們試着調下參試試!
# 1. 將單個文件設置為50w行記錄, 耗時如下: # 額, 沒跑完, 反正沒啥提升, 單個文件排序由之前的2s左右, 上升到了11s左右 # 2. 將單個設置為20w行記錄試試: # 單個文件排序上升到了4.xs, 也不理想啊; real 9m2.948s user 21m25.373s sys 0m27.067s
增加下並行度試試!
# 增加並行度到10 real 9m3.569s user 21m4.346s sys 0m27.519s # 單文件行數 50w, 並行10個進程 real 8m12.916s user 21m40.624s sys 0m20.988s
看起來效果更差了,或者差不多. 難道這參數咋調整也沒用了么? 算了, 不搞了.
4. 換個性能好的機器試試
前面的機器太差了,也沒了信心。干脆換一個機器試試(與此同時我們應該得出一個診斷,排序是非常耗資源的,你應該要考慮其影響性問題,比如如何分配資源,出現異常情況如何處理,而不要為了這一小功能的調優而讓整個應用處理危險之中)。下面我們直接進入優化參數環節:(僅為理論調優,實際應用請當心)
# 單文件50w行, 5進程 real 5m6.348s user 5m38.684s sys 0m44.997s # 單文件100w行, 5進程 real 2m39.386s user 3m34.682s sys 0m23.157s # 單文件100w行, 10進程 real 2m22.223s user 3m41.079s sys 0m25.418s # 以上結論是行數更容易影響結果, 因排序是計算型密集型任務, 進程數也許等於CPU核數比較優的選擇 # 不過也有一個干擾項:即文件的讀取寫入是IO開銷,此時2倍以上的CPU核數進程可能是更好的選擇 # 單文件100w行, 10進程, 7100w總數(1.6G) # 使用原始排序 sort real 6m14.485s user 5m10.292s sys 0m13.042s # 使用 LC_ALL=C 測試結果 real 2m1.992s user 1m18.041s sys 0m11.254s # 使用分治排序, 100w行, 10進程 real 2m53.637s user 4m22.170s sys 0m29.979s
好吧,linux的優化估計只能到這里了。總結: 1. LC_ALL=C 很好用;2. 並行優化很困難。
5. 自行實現大文件排序
看起來shell幫不了太多忙了,咋整?用java實現?線程池可以很好利用隊列先后問題;但到底有多少優勢呢?試試就試試!
多線程可以很方便地並行,比如在分片文件的同時,其他線程就可以同進行排序了,也許等分片完成,排序就ok了呢!
簡單時序可表示為: split拆分線程拆分文件 -> file1拆得一個個文件 -> submitSortTask(file1) 立即提交排序任務-> sort(file1) -> submitMergeTask(file1) 立即提交歸並任務 -> wait (所有merge都完成)。也就是說所有過程都並行化了或者管道化了(pipeline),拆分一個文件就可以進行排序文件,排序完一個文件就可以合並一個文件。
線程模型是這樣的: 1個拆分線程 -> n個排序線程 -> 1個歸並線程;任務執行完成的前提是:n個排序線程完成 + n個歸並任務完成;
大體思路是這樣,優勢如何得看數據說話。但我想還可以優化的是,歸並線程是否可以維護一個指針,代表最后一次插入的結果,以便可以快速比較插入;(這可能有點困難,另外就是歸並都要寫新文件,這里可能是個瓶頸點,因為如果有1000次歸並,就會存在讀寫大文件的過程,如果前面的文件分片存在速度慢的問題,那么此處的反復寫更是一個瓶頸點,即使是用linux的sort進行歸並也要花2min+,更不用說一次次地寫讀了)
代碼實現:(以下實現的歸並任務沒有並行,而是簡單化一次合並,時間復雜度為 O(m*n),其中m為分片文件個數;)(文件分片時間復雜度為 O(n))(小文件排序基於Arrays.sort(), 時間復雜有點復雜,暫且定為O(n*logn) 吧)
import lombok.extern.log4j.Log4j2; import org.apache.commons.io.FileUtils; import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @Log4j2 public class BigFileSortHelper { /** * 排序分片線程池 */ private static final ExecutorService sortSplitThreadPool = Executors.newFixedThreadPool(5, new NamedThreadFactory("Sort-Thread-")); /** * 歸並線程池 */ private static final ExecutorService mergeThreadPool = Executors.newSingleThreadExecutor( new NamedThreadFactory("Merge-Thread-")); /** * 排序大文件(正序) * * @param filePath 文件路徑 * @throws IOException 讀取寫入異常拋出 */ public static void sortBigFile(String filePath) throws IOException { // split -> f1 -> submit(f1) -> sort(f1) -> merge(f1) -> wait (所有merge都完成) int perFileLines = 20_0000; List<SplitFileDescriptor> splitFiles = new ArrayList<>(); String tmpDir = "/tmp/sort_" + System.currentTimeMillis(); try(FileReader reader = new FileReader(new File(filePath))) { BufferedReader bufferedReader = new BufferedReader(reader); String lineData; SplitFileDescriptor splitFile = null; AtomicLong lineNumberCounter = new AtomicLong(0); while ((lineData = bufferedReader.readLine()) != null) { if(lineNumberCounter.get() % perFileLines == 0) { submitSortTask(splitFile); splitFile = rolloverSplitFile(splitFile, lineNumberCounter.get(), perFileLines, tmpDir); splitFiles.add(splitFile); } writeLine(splitFile, lineData, lineNumberCounter); } // 提交最后一個分片文件排序 submitSortTask(splitFile); for (SplitFileDescriptor sp : splitFiles) { try { sp.getFuture().get(); } catch (Exception e) { log.error("排序分片文件結果異常", e); } } List<String> subFilePathList = splitFiles.stream() .map(SplitFileDescriptor::getFullFilePath) .collect(Collectors.toList()); mergeSortedFile(subFilePathList, "/tmp/merge_" + System.currentTimeMillis() + ".txt"); FileUtils.deleteQuietly(new File(tmpDir)); } } /** * 排序指定文件 * * @param originalFilePath 要排序的文件 * @return 排序后的文件位置(可為原地排序) * @throws IOException 讀取寫入異常時拋出 */ private static String sortFile(String originalFilePath) throws IOException { List<String> lines = FileUtils.readLines( new File(originalFilePath), "utf-8"); lines.sort(String::compareTo); FileUtils.writeLines(new File(originalFilePath), lines); return originalFilePath; } /** * 滾動生成一個新的分片文件 * * @param lastFile 上一個輸出的分片文件 * @param currentLineNum 當前總記錄行數 * @param perFileLines 單文件可容納行數 * @param tmpDir 存放臨時文件的目錄(分片文件) * @return 分片文件的信息 * @throws IOException 文件打開異常拋出 */ private static SplitFileDescriptor rolloverSplitFile(SplitFileDescriptor lastFile, long currentLineNum, int perFileLines, String tmpDir) throws IOException { if(lastFile != null) { lastFile.close(); } int splitFileNo = (int) (currentLineNum / perFileLines); String formattedFileName = String.format("sp_%04d", splitFileNo); return SplitFileDescriptor.newSplit(tmpDir, formattedFileName); } /** * 提交排序任務 * * @param splitFile 單個小分片文件實例 */ private static void submitSortTask(SplitFileDescriptor splitFile) { if(splitFile == null) { return; } Future<?> sortFuture = sortSplitThreadPool.submit(() -> { try { sortFile(splitFile.getFullFilePath()); } catch (IOException e) { log.error("排序單文件時發生了異常" + splitFile.getFullFilePath(), e); } }); splitFile.setFuture(sortFuture); } /** * 合並有序文件 * * @param splitFilePathList 子分片文件列表 * @param outputPath 結果文件存放路徑 * @throws IOException 讀寫異常時拋出 */ public static long mergeSortedFile(List<String> splitFilePathList, String outputPath) throws IOException { List<BufferedReader> bufferedReaderList = new ArrayList<>(splitFilePathList.size()); splitFilePathList.forEach(r -> { FileReader reader = null; try { reader = new FileReader(new File(r)); BufferedReader buffFd = new BufferedReader(reader); bufferedReaderList.add(buffFd); } catch (FileNotFoundException e) { log.error("文件讀取異常", e); } }); String[] onlineDataShards = new String[bufferedReaderList.size()]; int i = 0; for ( ; i < bufferedReaderList.size(); i++ ) { BufferedReader reader = bufferedReaderList.get(i); onlineDataShards[i] = reader.readLine(); } String lastLineData = null; AtomicLong lineNumCounter = new AtomicLong(0); try(OutputStream targetOutput = FileUtils.openOutputStream(new File(outputPath))) { while (true) { int minIndex = 0; for (int j = 1; j < onlineDataShards.length; j++) { // 最小的文件已被迭代完成 if(onlineDataShards[minIndex] == null) { minIndex = j; continue; } // 后一文件已被迭代完成 if(onlineDataShards[j] == null) { continue; } if (onlineDataShards[minIndex].compareTo(onlineDataShards[j]) > 0) { minIndex = j; } } // 所有文件都已迭代完成 if(onlineDataShards[minIndex] == null) { break; } String minData = onlineDataShards[minIndex]; // 去重 if(!minData.equals(lastLineData)) { writeLine(targetOutput, minData, lineNumCounter); lastLineData = minData; } // 迭代下一行 onlineDataShards[minIndex] = bufferedReaderList.get(minIndex).readLine(); } } for (BufferedReader reader : bufferedReaderList) { reader.close(); } return lineNumCounter.get(); } /** * 寫單行數據到輸出流 */ private static void writeLine(SplitFileDescriptor splitFile, String lineData, AtomicLong lineNumCounter) throws IOException { if(splitFile == null) { throw new RuntimeException("分片文件為空"); } OutputStream outputStream = splitFile.getOutputStream(); writeLine(outputStream, lineData, lineNumCounter); } /** * 寫單行數據到輸出流 */ private static void writeLine(OutputStream outputStream, String lineData, AtomicLong lineNumCounter) throws IOException { outputStream.write(lineData.getBytes()); outputStream.write("\n".getBytes()); lineNumCounter.incrementAndGet(); } /** * 分片文件描述 */ private static class SplitFileDescriptor { String subFileName; String fullFilePath; OutputStream outputStream; Future<?> future; public SplitFileDescriptor(String mainDir, String subFileName) throws IOException { this.subFileName = subFileName; if(!mainDir.endsWith("/")) { mainDir += "/"; } this.fullFilePath = mainDir + subFileName; this.outputStream = FileUtils.openOutputStream(new File(fullFilePath)); } public static SplitFileDescriptor newSplit(String mainDir, String subFileName) throws IOException { return new SplitFileDescriptor(mainDir, subFileName); } public void close() throws IOException { outputStream.close(); } public void setFuture(Future<?> future) { this.future = future; } public String getSubFileName() { return subFileName; } public void setSubFileName(String subFileName) { this.subFileName = subFileName; } public String getFullFilePath() { return fullFilePath; } public void setFullFilePath(String fullFilePath) { this.fullFilePath = fullFilePath; } public OutputStream getOutputStream() { return outputStream; } public void setOutputStream(OutputStream outputStream) { this.outputStream = outputStream; } public Future<?> getFuture() { return future; } } /** * 簡單命命名線程生成工廠類 */ private static class NamedThreadFactory implements ThreadFactory { private AtomicInteger counter = new AtomicInteger(0); private String threadNamePrefix; public NamedThreadFactory(String prefix) { this.threadNamePrefix = prefix; } @Override public Thread newThread(Runnable r) { return new Thread(r, threadNamePrefix + counter.incrementAndGet()); } } }
老實說,個人覺得思路還算不差,而且這段代碼看起來還是不錯的。
但是效果如何? 我用 無言以對 來回答,分片階段就跟蝸牛一樣,真的是性能差得不行,具體數據就不說了,反正比原始的 sort 性能還要差。雖然還有很多優化的地方,比如使用nio,mmap。。。 但終究太費力。
6. 基本內存分片的快速排序實現
有一個巧用,可以直接將讀取的分片數據直接丟到排序線程池排序,然后再寫文件,這樣減少了寫分片與重新讀分片的io消耗,鐵定能提升不少性能,這是為性能的鋌而走險,其風險點是內存數據過大將帶來不可逆轉不可抗力!
完善點的實現如下:(使用內存排序,nio讀取文件)
import lombok.extern.slf4j.Slf4j; import org.apache.commons.io.FileUtils; import org.apache.commons.io.LineIterator; import java.io.*; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.stream.Collectors; @Slf4j public class BigFileSortHelper { /** * 排序分片線程池 */ private static final ExecutorService sortSplitThreadPool = Executors.newFixedThreadPool(5, new NamedThreadFactory("Sort-Thread-")); /** * 歸並線程池 */ private static final ExecutorService mergeThreadPool = Executors.newSingleThreadExecutor( new NamedThreadFactory("Merge-Thread-")); /** * 排序大文件(正序) * * @param filePath 文件路徑 * @throws IOException 讀取寫入異常拋出 */ public static String sortBigFile(String filePath) throws IOException { // split -> f1 -> submit(f1) -> sort(f1) -> merge(f1) -> wait (所有merge都完成) long startTime = System.currentTimeMillis(); String sortedFilePath = "/tmp/merge_" + startTime + ".txt"; String tmpDir = "/tmp/sort_" + startTime; // splitBigFileAndSubmitSortUseBufferReader(filePath, tmpDir); List<SplitFileDescriptor> splitFiles = splitBigFileAndSubmitSortUseApacheIoItr(filePath, tmpDir); log.info("分片流程處理完成, 分片文件個數:{}, 耗時:{}ms", splitFiles.size(), (System.currentTimeMillis() - startTime)); for (SplitFileDescriptor sp : splitFiles) { try { sp.getFuture().get(); } catch (Exception e) { log.error("排序分片文件結果異常", e); } } log.info("所有子分片文件排序完成, 耗時:{}ms", (System.currentTimeMillis() - startTime)); List<String> subFilePathList = splitFiles.stream() .map(SplitFileDescriptor::getFullFilePath) .collect(Collectors.toList()); // long totalLines = mergeSortedFileUseBufferReader(subFilePathList, sortedFilePath); long totalLines = mergeSortedFileUseApacheIoItr(subFilePathList, sortedFilePath); log.info("文件歸並完成, 總寫入行數:{}, 耗時:{}ms", totalLines, (System.currentTimeMillis() - startTime)); boolean delSuccess = FileUtils.deleteQuietly(new File(tmpDir)); if(!delSuccess) { log.warn("清理文件夾失敗:{}", tmpDir); } return sortedFilePath; } /** * 使用bufferReader讀取大文件內容並分片並提交排序線程 * * @param filePath 大文件路徑 * @param tmpDir 切分的小文件路徑 * @return 分片好的文件信息列表 * @throws IOException 讀取異常拋出 */ private static List<SplitFileDescriptor> splitBigFileAndSubmitSortUseBufferReader(String filePath, String tmpDir) throws IOException { int perFileLines = 20_0000; List<SplitFileDescriptor> splitFiles = new ArrayList<>(); String lineData; SplitFileDescriptor splitFile = null; AtomicLong lineNumberCounter = new AtomicLong(0); // 使用bufferdReader讀取文件分片 try(FileReader reader = new FileReader(new File(filePath))) { BufferedReader bufferedReader = new BufferedReader(reader); while ((lineData = bufferedReader.readLine()) != null) { if(lineNumberCounter.get() % perFileLines == 0) { submitSortTask(splitFile); splitFile = rolloverSplitFile(splitFile, lineNumberCounter.get(), perFileLines, tmpDir); splitFiles.add(splitFile); } writeLine(splitFile, lineData, lineNumberCounter); } } // 提交最后一個分片文件排序 submitSortTask(splitFile); return splitFiles; } /** * 讀取大文件分片並提交排序(使用apache io組件實現) * * @see #splitBigFileAndSubmitSortUseBufferReader(String, String) */ private static List<SplitFileDescriptor> splitBigFileAndSubmitSortUseApacheIoItr(String filePath, String tmpDir) throws IOException { int perFileLines = 20_0000; List<SplitFileDescriptor> splitFiles = new ArrayList<>(); String lineData; SplitFileDescriptor splitFile = null; AtomicLong lineNumberCounter = new AtomicLong(0); // 使用apache io 類庫讀取文件分片, 比直接使用 bufferedReader 性能好 LineIterator lineItr = FileUtils.lineIterator(new File(filePath)); try { while (lineItr.hasNext()) { lineData = lineItr.nextLine(); if(lineNumberCounter.get() % perFileLines == 0) { submitSortTask(splitFile); splitFile = rolloverSplitFile(splitFile, lineNumberCounter.get(), perFileLines, tmpDir); splitFiles.add(splitFile); } writeLine(splitFile, lineData, lineNumberCounter); } } finally { LineIterator.closeQuietly(lineItr); } // 提交最后一個分片文件排序 submitSortTask(splitFile); return splitFiles; } /** * 排序指定文件 * * @param splitFile 要排序的分片文件 * @return 排序后的文件位置(可為原地排序) * @throws IOException 讀取寫入異常時拋出 */ private static String sortSplitFile(SplitFileDescriptor splitFile) throws IOException { String originalFilePath = splitFile.getFullFilePath(); List<String> lines = splitFile.readLineDataBuffer(); if(lines == null) { lines = FileUtils.readLines( new File(originalFilePath), "utf-8"); } lines.sort(String::compareTo); FileUtils.writeLines(new File(originalFilePath), lines); return originalFilePath; } /** * 滾動生成一個新的分片文件 * * @param lastFile 上一個輸出的分片文件 * @param currentLineNum 當前總記錄行數 * @param perFileLines 單文件可容納行數 * @param tmpDir 存放臨時文件的目錄(分片文件) * @return 分片文件的信息 * @throws IOException 文件打開異常拋出 */ private static SplitFileDescriptor rolloverSplitFile(SplitFileDescriptor lastFile, long currentLineNum, int perFileLines, String tmpDir) throws IOException { if(lastFile != null) { lastFile.close(); } int splitFileNo = (int) (currentLineNum / perFileLines); String formattedFileName = String.format("sp_%04d", splitFileNo); return SplitFileDescriptor.newSplit(tmpDir, formattedFileName); } /** * 提交排序任務 * * @param splitFile 單個小分片文件實例 */ private static void submitSortTask(SplitFileDescriptor splitFile) { if(splitFile == null) { return; } Future<?> sortFuture = sortSplitThreadPool.submit(() -> { try { sortSplitFile(splitFile); } catch (IOException e) { log.error("排序單文件時發生了異常" + splitFile.getFullFilePath(), e); } }); splitFile.setFuture(sortFuture); } /** * 合並有序文件 * * @param splitFilePathList 子分片文件列表 * @param outputPath 結果文件存放路徑 * @throws IOException 讀寫異常時拋出 */ public static long mergeSortedFileUseBufferReader(List<String> splitFilePathList, String outputPath) throws IOException { List<BufferedReader> bufferedReaderList = new ArrayList<>(splitFilePathList.size()); splitFilePathList.forEach(r -> { FileReader reader = null; try { reader = new FileReader(new File(r)); BufferedReader buffFd = new BufferedReader(reader); bufferedReaderList.add(buffFd); } catch (FileNotFoundException e) { log.error("文件讀取異常", e); } }); String[] onlineDataShards = new String[bufferedReaderList.size()]; int i = 0; for ( ; i < bufferedReaderList.size(); i++ ) { BufferedReader reader = bufferedReaderList.get(i); onlineDataShards[i] = reader.readLine(); } String lastLineData = null; AtomicLong lineNumCounter = new AtomicLong(0); try(OutputStream targetOutput = FileUtils.openOutputStream(new File(outputPath))) { while (true) { int minIndex = 0; for (int j = 1; j < onlineDataShards.length; j++) { // 后一文件已被迭代完成 if(onlineDataShards[j] == null) { continue; } // 最小的文件已被迭代完成 if(onlineDataShards[minIndex] == null) { minIndex = j; continue; } if (onlineDataShards[j].compareTo(onlineDataShards[minIndex]) < 0) { minIndex = j; } } // 所有文件都已迭代完成 if(onlineDataShards[minIndex] == null) { break; } String minData = onlineDataShards[minIndex]; // 去重 if(!minData.equals(lastLineData)) { writeLine(targetOutput, minData, lineNumCounter); lastLineData = minData; } // 迭代下一行 onlineDataShards[minIndex] = bufferedReaderList.get(minIndex).readLine(); } } for (BufferedReader reader : bufferedReaderList) { reader.close(); } return lineNumCounter.get(); } /** * 合並有序文件(使用 apache-io 的 lineIterator) * * @see #mergeSortedFileUseBufferReader(List, String) */ public static long mergeSortedFileUseApacheIoItr(List<String> splitFilePathList, String outputPath) throws IOException { List<LineIterator> bufferedReaderList = new ArrayList<>(splitFilePathList.size()); splitFilePathList.forEach(r -> { try { bufferedReaderList.add( FileUtils.lineIterator(new File(r))); } catch (IOException e) { log.error("文件讀取異常", e); } }); String[] onlineDataShards = new String[bufferedReaderList.size()]; int i = 0; for ( ; i < bufferedReaderList.size(); i++ ) { LineIterator reader = bufferedReaderList.get(i); onlineDataShards[i] = reader.nextLine(); } log.info("准備merge文件個數:{}", bufferedReaderList.size()); String lastLineData = null; int lastLineFd = -1; // 第二大的文件,用於二次快速比較大小 int lastSecondBigLineFd = -1; AtomicLong lineNumCounter = new AtomicLong(0); try(OutputStream targetOutput = FileUtils.openOutputStream(new File(outputPath))) { while (true) { int minIndex = 0; String lastSecondBigLineData = lastSecondBigLineFd == -1 ? null : onlineDataShards[lastSecondBigLineFd]; // 比上一次第二小的值還小,則就是當前的最小值沒錯了 // 第二小的值不那么好找,預留,待后續再完善吧 String newReadLineData = lastLineFd == -1 ? null : onlineDataShards[lastLineFd]; if(newReadLineData != null && (newReadLineData.equals(lastLineData) || (lastSecondBigLineData != null && newReadLineData.compareTo(lastSecondBigLineData) <= 0))) { minIndex = lastLineFd; } else if (lastSecondBigLineData != null) { minIndex = lastSecondBigLineFd; lastSecondBigLineFd = -1; } else { // 重新搜索最小值對應文件 List<Integer> swappedFds = new ArrayList<>(); for (int j = 1; j < onlineDataShards.length; j++) { // 后一文件已被迭代完成 String curShardLineData = onlineDataShards[j]; if(curShardLineData == null) { continue; } // 最小的文件已被迭代完成 if(onlineDataShards[minIndex] == null) { minIndex = j; continue; } if (curShardLineData.compareTo(onlineDataShards[minIndex]) < 0) { swappedFds.add(minIndex); minIndex = j; } // 與上一次最小一樣,就不要再往下比較了,是最小沒錯 if(onlineDataShards[minIndex].equals(lastLineData)) { break; } } } lastLineFd = minIndex; // 所有文件都已迭代完成 if(onlineDataShards[minIndex] == null) { break; } String minData = onlineDataShards[minIndex]; // 去重 if(!minData.equals(lastLineData)) { writeLine(targetOutput, minData, lineNumCounter); log.info("write lineData: " + minData); lastLineData = minData; } // 迭代下一行 LineIterator fdItr = bufferedReaderList.get(minIndex); if(fdItr.hasNext()) { onlineDataShards[minIndex] = fdItr.nextLine(); continue; } onlineDataShards[minIndex] = null; } } int closedFileNum = 0; for (LineIterator reader : bufferedReaderList) { LineIterator.closeQuietly(reader); if(reader != null) { closedFileNum++; } } log.info("關閉分片文件個數:{}", closedFileNum); return lineNumCounter.get(); } /** * 寫單行數據到輸出流 */ private static void writeLine(SplitFileDescriptor splitFile, String lineData, AtomicLong lineNumCounter) throws IOException { if(splitFile == null) { throw new RuntimeException("分片文件為空"); } splitFile.writeLine(lineData); lineNumCounter.incrementAndGet(); } /** * 寫單行數據到輸出流 */ private static void writeLine(OutputStream outputStream, String lineData, AtomicLong lineNumCounter) throws IOException { outputStream.write(lineData.getBytes()); outputStream.write("\n".getBytes()); lineNumCounter.incrementAndGet(); } /** * 分片文件描述 */ private static class SplitFileDescriptor { String subFileName; String fullFilePath; OutputStream outputStream; Future<?> future; /** * 用於存放緩沖數據 */ List<String> lineDataBuffer; public SplitFileDescriptor(String mainDir, String subFileName) throws IOException { this.subFileName = subFileName; if(!mainDir.endsWith("/")) { mainDir += "/"; } this.fullFilePath = mainDir + subFileName; } public static SplitFileDescriptor newSplit(String mainDir, String subFileName) throws IOException { return new SplitFileDescriptor(mainDir, subFileName); } public void close() throws IOException { outputStream.close(); } public void setFuture(Future<?> future) { this.future = future; } public String getFullFilePath() { return fullFilePath; } public void writeLine(String lineData) throws IOException { if(lineDataBuffer == null) { lineDataBuffer = new ArrayList<>(); } lineDataBuffer.add(lineData); // outputStream.write(lineData.getBytes()); // outputStream.write("\n".getBytes()); } public Future<?> getFuture() { return future; } /** * 讀緩沖數據(單向不可逆) */ public List<String> readLineDataBuffer() { List<String> buf = lineDataBuffer; resetBuffer(); return buf; } /** * 重置緩沖, 避免內存溢出 */ void resetBuffer() { lineDataBuffer = null; } } /** * 簡單命命名線程生成工廠類 */ private static class NamedThreadFactory implements ThreadFactory { private AtomicInteger counter = new AtomicInteger(0); private String threadNamePrefix; public NamedThreadFactory(String prefix) { this.threadNamePrefix = prefix; } @Override public Thread newThread(Runnable r) { return new Thread(r, threadNamePrefix + counter.incrementAndGet()); } } }
具體就是,使用apache的io包進行文件讀取(底層基於nio),另外,將大文件分片的結果優先寫到分片緩沖中,直接丟入排序線程,排序非常快。所以當分片完成時,基本上排序也就完成了。而歸並的過程,則是一個插入排序的過程,消耗也主要文件讀取io,使用一個lastLineData作為去重的實現,在內容重復度很高時,該操作非常有用。上面的優化,基本可以提供4倍左右的性能,還是不錯的。就是會存在一定風險:如當io足夠快時,很可能排序線程就跟不上,從而導致內存撐爆了;另外如果外部請求排序的任務較多時,也會導致內容耗光,這都是極其危險的。
單元測試如下:
@Slf4j public class BigFileSortHelperTest { @Test public void testSortFile1() throws IOException { long startTime = System.currentTimeMillis(); log.info("start sort process."); String sortedFilePath = BigFileSortHelper.sortBigFile("D:\\cygwin64\\home\\Administrator\\1-merged.txt"); log.info("sortedFilePath:" + sortedFilePath); log.info("costTime: " + (System.currentTimeMillis() - startTime) + "ms"); } @Test public void testMergeFunc() throws IOException { long startTime = System.currentTimeMillis(); String splitFilePath = "/tmp/sort_1602922717865"; Collection<File> files = FileUtils.listFiles(new File(splitFilePath), TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE); List<String> splitFileList = files.stream().map(File::getPath).collect(Collectors.toList()); long totalLines = BigFileSortHelper.mergeSortedFileUseApacheIoItr( splitFileList, "/tmp/merge0.txt"); log.info("merge costTime:{}, totalLines:{} ", System.currentTimeMillis() - startTime, totalLines); Assert.assertEquals("去重后的文件行數不對", 33, totalLines); } }
以上實現,還是非常棒的,供諸君參考。