大文件排序優化實踐


  在很多應用場景中,我們都會面臨着排序需求,可以說是見怪不怪。我們也看過許多的排序算法:從最簡單的冒泡排序、選擇排序,到稍微好點的插入排序、希爾排序,再到有點理論的堆排序、快速排序,再到高級的歸並排序、桶排序、基數排序。

  而實際工作中我們可能用到的排序有哪些呢?而且,大部分時序,相信大家都是使用一個現有庫API直接就完成了排序功能。所以,講真,大家還真不一定會很好排序。

  不過本文的目的不是基礎排序算法,而是如何處理數據量的文件的內容排序問題?

1. 多大的文件算大文件?

  多大的文件算大文件?這是個定義的問題,當我每天處理的都是幾百幾千的數據,那么我遇到幾萬的數據后,我可以認為這個數據量是大的。

  但總體來說,我們還是需要定義一個量級的,不然無法評估處理能力問題。

  比如我們定義超過200M的文件算大文件可以不?我們定義超過5000w行的數據算大文件可以不?

  好了,基於這樣大的數據量的時候,也許我們就不能簡單的調用幾個庫函數就解決問題了,至少你load到內存也將存在一定的風險了。

  所以,是時候想想優化的事了!

 

2. 如何利用好現有的工具?

  針對一些問題,我們可以自己做,也可以找別人幫忙做。具體誰來做,這是個問題!

  比如,你自己花個一兩天的時間,寫了個排序算法,搞定了。但是,你能保證你的穩定性嗎?你能經受住生產環境復雜的環境考驗嗎?

  再比如,你可以現有的工具進行操作,如果有人提供了穩定的api函數供調用的話,你可以這么干。如果你的服務是跑在linux環境下,那么,我們有必要試一下系統提供的排序功能。 sort . 這工具絕對是經過無數的考驗的,可以放心使用。它也有豐富的參數供選擇,這對我們的日常工作非常有幫助,但對於一個普通的排序也許我們並不需要。

  比如最簡單的,自然排序:

sort 1-merged.txt -o 1-sorted.txt

  就可以將文件排好序了。但是當數據非常大的時候,比如我使用 7000w+ 的行數(約1.2G)進行排序時,就花費了 6min+ . 也許是我硬件不怎么好,但是實際上它就是會很慢啊!

$ time sort 1-merged.txt -o 1-sorted.txt

real    8m5.709s
user    25m19.652s
sys     0m4.523s

  這種性能,在當今大數據橫行的時代,基本就是胎死腹中了。大數據應對的都是TB/PB 級別的數量,而我們僅為GB級並且沒有做其他業務就已經耗費了這么長時間,這是沒辦法繼續了。讓我進一步深入。

  看到文檔里有說,系統本地化配置影響排序,實際就是存在一個編解碼的問題,它會依據本地的配置來進行轉換字符然后再進行排序。這個開銷可是不小哦,比如我們設置都是中文環境。而要去除這個影響,則可以使用添加 LC_ALL=C 之后就會使用原始的值進行排序,具體影響就是省去轉換編碼的開銷了。那么,我們用這個參數試試。

*** WARNING ***
The locale specified by the environment affects sort order.
Set LC_ALL=C to get the traditional sort order that uses
native byte values.

$ time LC_ALL=C sort 1-merged.txt -o 1-sorted.txt

real    2m52.663s
user    2m7.577s
sys     0m5.146s

  哇,從8分鍾降到了3分鍾,雖然不是數量級的提升,但至少下降了一半以上的時間消耗,還是非常可觀的。到這個地步,也許能滿足我們的場景了。

  但是,請注意一個問題,這里的 LC_ALL=C 之后,就會使用默認的邏輯進行處理了,那么,會有什么影響呢?實際上就是一些本地相關的東西,就會失效了。

  最直接的,就是中文處理的問題了,比如我有一個文件內容是這樣的:

床前明月光,
疑是地上霜。
舉頭望明月,
低頭思故鄉。
天子呼來不上船,
自稱臣是酒中仙。
紅酥手,
黃藤酒,
滿城春色宮牆柳。

  那么,我們使用 LC_ALL=C 設置來排序后,將會得到如下結果:

$ LC_ALL=C sort 1.txt -o 1-s1.txt

$ cat 1-s1.txt
舉頭望明月,
低頭思故鄉。
天子呼來不上船,
滿城春色宮牆柳。
疑是地上霜。
紅酥手,
自稱臣是酒中仙。
黃藤酒,
床前明月光,

  額,看不懂啥意思?中文咋排序的我也給整忘了(而且各自機器上得到的結果也可能不一樣)。好吧,沒關系,我們去掉 LC_ALL=C 來看看結果:

$ sort 1.txt -o 1-s1.txt

$ cat 1-s1.txt
床前明月光,
低頭思故鄉。
紅酥手,
黃藤酒,
舉頭望山月,
滿城春色宮牆柳。
天子呼來不上船,
疑是地上霜。
自稱臣是酒中仙。

  這下看懂了吧,這是按照拼音順序來排序的。所以,你說 LC_ALL=C 重不重要,如果沒有本地化設置,很多東西都是不符合情理的。所以,有時候我們還真不能這么干咯。

  如果真想這么干,除非你確認你的文件里只有英文字符符號和數字,或者是 ASCII 的127 個字符。

 

3. 繞個路高級一下

  前面的方法,不是不能解決問題,而是不能解決所有問題。所以,我們還得繼續想辦法。想想當下對大文件的處理方式都有哪些?實際也不多,並行計算是根本,但我們也許做不了並行計算,但我們可以拆分文件嘛。一個文件太大,我們就文件拆小排序后再合並嘛!就是不知道性能如何?

split -l 100000 -d ../1-merged.txt -a 4 sp_; 
for file in sp_*.txt; do; 
    sort -o $file sorted_$file; 
done; 
sort -m sp_*.txt -o targed.txt;
# 一行化后的格式  
$ time for file in sp_*; do sort -o sorted_$file $file; done; sort -m sorted_* -o targetd.txt;

real    12m15.256s
user    10m11.465s
sys     0m18.623s
# 以上時間僅是單個文件的排序時間還不算歸並的時間,下面這個代碼可以統一計算
$ time `for file in sp_1_*; do sort $file -o sorted_$file; done; sort -m sorted_* -o targetd.txt;`

real    14m27.643s
user    11m13.982s
sys     0m22.636s

  看起來切分小文件后,排序太耗時間了,看看能不能用多進程輔助下!(所以最終我們還是回到了並行計算的問題上了)

# shell 異步運行就是在其后面添加 & 就可以了, 但是最后的歸並是同步的.
$ time `split -l 100000 -d ../1-merged.txt -a 4 sp_ ; for file in sp_* ; do {sort $file -o $file} &; done; wait; sort -m sp_* -o target.txt ; `
# 多處計時監控
$ time `time split -l 100000 -d ../1-merged.txt -a 4 sp_; time for file in sp_1_*; do { sort $file -o $file } & ; done; time wait; time sort -m sp_* -o target.txt;`
# 以上報錯,因為命令行下不允許使用 & 操作, 只能自己寫shell腳本,然后運行了
# sort_merge.sh
time split -l 100000 -d ../1-merged.txt -a 4 sp_;
i=0
for file in sp_*;
do
{
    #echo "sort -o $file $file";
    sort -o $file $file;
} &
done;
time wait;
time sort -m sp_* -o target.txt;
# 以上腳本的確是會以異步進行排序,但會開啟非常多的進程,從而導致進程調度繁忙,機器假死 # 需要修復下 # sort_merge.sh
split_file_prefix='sp_'
rm -rf ${split_file_prefix}*;
time split -l 1000000 -d ../short-target.csv -a 4 ${split_file_prefix};
i=0
for file in ${split_file_prefix}*;
do
{
    sort -o $file $file;
} & # 每開5個進程,就等一下
    (( i=$i + 1 ))
    b=$(( $i % 5 ))
    if [ $b = 0 ] ; then
        # 小優化: 只要上一個進程退出就繼續,而不是等到所有進程退出再繼續
        time wait $!
        # time wait
    fi;
done;
time wait;
time sort -m ${split_file_prefix}* -o target.txt;
# 以上運行下來,耗時9min+, 比未優化時還要差, 尷尬!
real    9m54.076s
user    19m1.480s
sys     0m36.016s

  看起來沒啥優勢啊, 咋整? 咱們試着調下參試試!

# 1. 將單個文件設置為50w行記錄, 耗時如下:
# 額, 沒跑完, 反正沒啥提升, 單個文件排序由之前的2s左右, 上升到了11s左右
# 2. 將單個設置為20w行記錄試試: # 單個文件排序上升到了4.xs, 也不理想啊;
real    9m2.948s
user    21m25.373s
sys     0m27.067s

  增加下並行度試試!

# 增加並行度到10
real    9m3.569s
user    21m4.346s
sys     0m27.519s
# 單文件行數 50w, 並行10個進程
real    8m12.916s
user    21m40.624s
sys     0m20.988s

  看起來效果更差了,或者差不多. 難道這參數咋調整也沒用了么? 算了, 不搞了.

 

4. 換個性能好的機器試試

  前面的機器太差了,也沒了信心。干脆換一個機器試試(與此同時我們應該得出一個診斷,排序是非常耗資源的,你應該要考慮其影響性問題,比如如何分配資源,出現異常情況如何處理,而不要為了這一小功能的調優而讓整個應用處理危險之中)。下面我們直接進入優化參數環節:(僅為理論調優,實際應用請當心)

# 單文件50w行, 5進程
real    5m6.348s
user    5m38.684s
sys     0m44.997s
# 單文件100w行, 5進程
real    2m39.386s
user    3m34.682s
sys     0m23.157s
# 單文件100w行, 10進程
real    2m22.223s
user    3m41.079s
sys     0m25.418s
# 以上結論是行數更容易影響結果, 因排序是計算型密集型任務, 進程數也許等於CPU核數比較優的選擇 # 不過也有一個干擾項:即文件的讀取寫入是IO開銷,此時2倍以上的CPU核數進程可能是更好的選擇 # 單文件100w行, 10進程, 7100w總數(1.6G)
# 使用原始排序 sort
real    6m14.485s
user    5m10.292s
sys     0m13.042s
# 使用 LC_ALL=C 測試結果
real    2m1.992s
user    1m18.041s
sys     0m11.254s
# 使用分治排序, 100w行, 10進程
real    2m53.637s
user    4m22.170s
sys     0m29.979s

  好吧,linux的優化估計只能到這里了。總結: 1. LC_ALL=C 很好用;2. 並行優化很困難。

 

5. 自行實現大文件排序

  看起來shell幫不了太多忙了,咋整?用java實現?線程池可以很好利用隊列先后問題;但到底有多少優勢呢?試試就試試!

多線程可以很方便地並行,比如在分片文件的同時,其他線程就可以同進行排序了,也許等分片完成,排序就ok了呢!

  簡單時序可表示為: split拆分線程拆分文件 -> file1拆得一個個文件 -> submitSortTask(file1) 立即提交排序任務-> sort(file1) -> submitMergeTask(file1) 立即提交歸並任務 -> wait (所有merge都完成)。也就是說所有過程都並行化了或者管道化了(pipeline),拆分一個文件就可以進行排序文件,排序完一個文件就可以合並一個文件。

  線程模型是這樣的: 1個拆分線程 -> n個排序線程 -> 1個歸並線程;任務執行完成的前提是:n個排序線程完成 + n個歸並任務完成;

  大體思路是這樣,優勢如何得看數據說話。但我想還可以優化的是,歸並線程是否可以維護一個指針,代表最后一次插入的結果,以便可以快速比較插入;(這可能有點困難,另外就是歸並都要寫新文件,這里可能是個瓶頸點,因為如果有1000次歸並,就會存在讀寫大文件的過程,如果前面的文件分片存在速度慢的問題,那么此處的反復寫更是一個瓶頸點,即使是用linux的sort進行歸並也要花2min+,更不用說一次次地寫讀了)

  代碼實現:(以下實現的歸並任務沒有並行,而是簡單化一次合並,時間復雜度為 O(m*n),其中m為分片文件個數;)(文件分片時間復雜度為 O(n))(小文件排序基於Arrays.sort(), 時間復雜有點復雜,暫且定為O(n*logn) 吧)

import lombok.extern.log4j.Log4j2;
import org.apache.commons.io.FileUtils;

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

@Log4j2
public class BigFileSortHelper {

    /**
     * 排序分片線程池
     */
    private static final ExecutorService sortSplitThreadPool
                            = Executors.newFixedThreadPool(5,
                                new NamedThreadFactory("Sort-Thread-"));

    /**
     * 歸並線程池
     */
    private static final ExecutorService mergeThreadPool
                            = Executors.newSingleThreadExecutor(
                                new NamedThreadFactory("Merge-Thread-"));

    /**
     * 排序大文件(正序)
     *
     * @param filePath 文件路徑
     * @throws IOException 讀取寫入異常拋出
     */
    public static void sortBigFile(String filePath) throws IOException {
        // split -> f1 -> submit(f1) -> sort(f1) -> merge(f1) -> wait (所有merge都完成)
        int perFileLines = 20_0000;
        List<SplitFileDescriptor> splitFiles = new ArrayList<>();
        String tmpDir = "/tmp/sort_" + System.currentTimeMillis();
        try(FileReader reader = new FileReader(new File(filePath))) {
            BufferedReader bufferedReader = new BufferedReader(reader);
            String lineData;
            SplitFileDescriptor splitFile = null;
            AtomicLong lineNumberCounter = new AtomicLong(0);
            while ((lineData = bufferedReader.readLine()) != null) {
                if(lineNumberCounter.get() % perFileLines == 0) {
                    submitSortTask(splitFile);
                    splitFile = rolloverSplitFile(splitFile,
                                    lineNumberCounter.get(), perFileLines, tmpDir);
                    splitFiles.add(splitFile);
                }
                writeLine(splitFile, lineData, lineNumberCounter);
            }
            // 提交最后一個分片文件排序
            submitSortTask(splitFile);
            for (SplitFileDescriptor sp : splitFiles) {
                try {
                    sp.getFuture().get();
                } 
                catch (Exception e) {
                    log.error("排序分片文件結果異常", e);
                }
            }
            List<String> subFilePathList = splitFiles.stream()
                            .map(SplitFileDescriptor::getFullFilePath)
                            .collect(Collectors.toList());
            mergeSortedFile(subFilePathList,
                    "/tmp/merge_" + System.currentTimeMillis() + ".txt");
            FileUtils.deleteQuietly(new File(tmpDir));
        }

    }

    /**
     * 排序指定文件
     *
     * @param originalFilePath 要排序的文件
     * @return 排序后的文件位置(可為原地排序)
     * @throws IOException 讀取寫入異常時拋出
     */
    private static String sortFile(String originalFilePath) throws IOException {
        List<String> lines = FileUtils.readLines(
                new File(originalFilePath), "utf-8");
        lines.sort(String::compareTo);
        FileUtils.writeLines(new File(originalFilePath), lines);
        return originalFilePath;
    }

    /**
     * 滾動生成一個新的分片文件
     *
     * @param lastFile 上一個輸出的分片文件
     * @param currentLineNum 當前總記錄行數
     * @param perFileLines 單文件可容納行數
     * @param tmpDir 存放臨時文件的目錄(分片文件)
     * @return 分片文件的信息
     * @throws IOException 文件打開異常拋出
     */
    private static SplitFileDescriptor rolloverSplitFile(SplitFileDescriptor lastFile,
                                                         long currentLineNum,
                                                         int perFileLines,
                                                         String tmpDir) throws IOException {
        if(lastFile != null) {
            lastFile.close();
        }
        int splitFileNo = (int) (currentLineNum / perFileLines);
        String formattedFileName = String.format("sp_%04d", splitFileNo);
        return SplitFileDescriptor.newSplit(tmpDir, formattedFileName);
    }

    /**
     * 提交排序任務
     *
     * @param splitFile 單個小分片文件實例
     */
    private static void submitSortTask(SplitFileDescriptor splitFile) {
        if(splitFile == null) {
            return;
        }
        Future<?> sortFuture = sortSplitThreadPool.submit(() -> {
            try {
                sortFile(splitFile.getFullFilePath());
            }
            catch (IOException e) {
                log.error("排序單文件時發生了異常" + splitFile.getFullFilePath(), e);
            }
        });
        splitFile.setFuture(sortFuture);
    }

    /**
     * 合並有序文件
     *
     * @param splitFilePathList 子分片文件列表
     * @param outputPath 結果文件存放路徑
     * @throws IOException 讀寫異常時拋出
     */
    public static long mergeSortedFile(List<String> splitFilePathList,
                                        String outputPath) throws IOException {
        List<BufferedReader> bufferedReaderList
                = new ArrayList<>(splitFilePathList.size());
        splitFilePathList.forEach(r -> {
            FileReader reader = null;
            try {
                reader = new FileReader(new File(r));
                BufferedReader buffFd = new BufferedReader(reader);
                bufferedReaderList.add(buffFd);
            }
            catch (FileNotFoundException e) {
                log.error("文件讀取異常", e);
            }
        });
        String[] onlineDataShards = new String[bufferedReaderList.size()];
        int i = 0;
        for ( ; i < bufferedReaderList.size(); i++ ) {
            BufferedReader reader = bufferedReaderList.get(i);
            onlineDataShards[i] = reader.readLine();
        }
        String lastLineData = null;
        AtomicLong lineNumCounter = new AtomicLong(0);
        try(OutputStream targetOutput = FileUtils.openOutputStream(new File(outputPath))) {
            while (true) {
                int minIndex = 0;
                for (int j = 1; j < onlineDataShards.length; j++) {
                    // 最小的文件已被迭代完成
                    if(onlineDataShards[minIndex] == null) {
                        minIndex = j;
                        continue;
                    }
                    // 后一文件已被迭代完成
                    if(onlineDataShards[j] == null) {
                        continue;
                    }
                    if (onlineDataShards[minIndex].compareTo(onlineDataShards[j]) > 0) {
                        minIndex = j;
                    }
                }
                // 所有文件都已迭代完成
                if(onlineDataShards[minIndex] == null) {
                    break;
                }
                String minData = onlineDataShards[minIndex];
                // 去重
                if(!minData.equals(lastLineData)) {
                    writeLine(targetOutput, minData, lineNumCounter);
                    lastLineData = minData;
                }
                // 迭代下一行
                onlineDataShards[minIndex]
                        = bufferedReaderList.get(minIndex).readLine();
            }
        }
        for (BufferedReader reader : bufferedReaderList) {
            reader.close();
        }
        return lineNumCounter.get();
    }
    
    /**
     * 寫單行數據到輸出流
     */
    private static void writeLine(SplitFileDescriptor splitFile,
                                  String lineData,
                                  AtomicLong lineNumCounter) throws IOException {
        if(splitFile == null) {
            throw new RuntimeException("分片文件為空");
        }
        OutputStream outputStream = splitFile.getOutputStream();
        writeLine(outputStream, lineData, lineNumCounter);
    }

    /**
     * 寫單行數據到輸出流
     */
    private static void writeLine(OutputStream outputStream,
                                  String lineData,
                                  AtomicLong lineNumCounter) throws IOException {
        outputStream.write(lineData.getBytes());
        outputStream.write("\n".getBytes());
        lineNumCounter.incrementAndGet();
    }

    /**
     * 分片文件描述
     */
    private static class SplitFileDescriptor {
        String subFileName;
        String fullFilePath;
        OutputStream outputStream;
        Future<?> future;

        public SplitFileDescriptor(String mainDir,
                                   String subFileName) throws IOException {
            this.subFileName = subFileName;
            if(!mainDir.endsWith("/")) {
                mainDir += "/";
            }
            this.fullFilePath = mainDir + subFileName;
            this.outputStream = FileUtils.openOutputStream(new File(fullFilePath));
        }

        public static SplitFileDescriptor newSplit(String mainDir,
                                                   String subFileName) throws IOException {
            return new SplitFileDescriptor(mainDir, subFileName);
        }

        public void close() throws IOException {
            outputStream.close();
        }

        public void setFuture(Future<?> future) {
            this.future = future;
        }

        public String getSubFileName() {
            return subFileName;
        }

        public void setSubFileName(String subFileName) {
            this.subFileName = subFileName;
        }

        public String getFullFilePath() {
            return fullFilePath;
        }

        public void setFullFilePath(String fullFilePath) {
            this.fullFilePath = fullFilePath;
        }

        public OutputStream getOutputStream() {
            return outputStream;
        }

        public void setOutputStream(OutputStream outputStream) {
            this.outputStream = outputStream;
        }

        public Future<?> getFuture() {
            return future;
        }
    }

    /**
     * 簡單命命名線程生成工廠類
     */
    private static class NamedThreadFactory implements ThreadFactory {
        private AtomicInteger counter
                = new AtomicInteger(0);
        private String threadNamePrefix;
        public NamedThreadFactory(String prefix) {
            this.threadNamePrefix = prefix;
        }
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,
                    threadNamePrefix + counter.incrementAndGet());
        }
    }
}

  老實說,個人覺得思路還算不差,而且這段代碼看起來還是不錯的。

  但是效果如何? 我用 無言以對 來回答,分片階段就跟蝸牛一樣,真的是性能差得不行,具體數據就不說了,反正比原始的 sort 性能還要差。雖然還有很多優化的地方,比如使用nio,mmap。。。 但終究太費力。

 

6. 基本內存分片的快速排序實現

  有一個巧用,可以直接將讀取的分片數據直接丟到排序線程池排序,然后再寫文件,這樣減少了寫分片與重新讀分片的io消耗,鐵定能提升不少性能,這是為性能的鋌而走險,其風險點是內存數據過大將帶來不可逆轉不可抗力!

  完善點的實現如下:(使用內存排序,nio讀取文件)

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;

import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;

@Slf4j
public class BigFileSortHelper {

    /**
     * 排序分片線程池
     */
    private static final ExecutorService sortSplitThreadPool
            = Executors.newFixedThreadPool(5,
            new NamedThreadFactory("Sort-Thread-"));

    /**
     * 歸並線程池
     */
    private static final ExecutorService mergeThreadPool
            = Executors.newSingleThreadExecutor(
            new NamedThreadFactory("Merge-Thread-"));

    /**
     * 排序大文件(正序)
     *
     * @param filePath 文件路徑
     * @throws IOException 讀取寫入異常拋出
     */
    public static String sortBigFile(String filePath) throws IOException {
        // split -> f1 -> submit(f1) -> sort(f1) -> merge(f1) -> wait (所有merge都完成)
        long startTime = System.currentTimeMillis();
        String sortedFilePath = "/tmp/merge_" + startTime + ".txt";
        String tmpDir = "/tmp/sort_" + startTime;
//        splitBigFileAndSubmitSortUseBufferReader(filePath, tmpDir);
        List<SplitFileDescriptor> splitFiles =
                splitBigFileAndSubmitSortUseApacheIoItr(filePath, tmpDir);

        log.info("分片流程處理完成, 分片文件個數:{}, 耗時:{}ms",
                    splitFiles.size(), (System.currentTimeMillis() - startTime));
        for (SplitFileDescriptor sp : splitFiles) {
            try {
                sp.getFuture().get();
            }
            catch (Exception e) {
                log.error("排序分片文件結果異常", e);
            }
        }
        log.info("所有子分片文件排序完成, 耗時:{}ms",
                      (System.currentTimeMillis() - startTime));

        List<String> subFilePathList = splitFiles.stream()
                .map(SplitFileDescriptor::getFullFilePath)
                .collect(Collectors.toList());
//        long totalLines = mergeSortedFileUseBufferReader(subFilePathList, sortedFilePath);
        long totalLines =  mergeSortedFileUseApacheIoItr(subFilePathList, sortedFilePath);
        log.info("文件歸並完成, 總寫入行數:{}, 耗時:{}ms",
                        totalLines, (System.currentTimeMillis() - startTime));
        boolean delSuccess = FileUtils.deleteQuietly(new File(tmpDir));
        if(!delSuccess) {
            log.warn("清理文件夾失敗:{}", tmpDir);
        }
        return sortedFilePath;
    }

    /**
     * 使用bufferReader讀取大文件內容並分片並提交排序線程
     *
     * @param filePath 大文件路徑
     * @param tmpDir 切分的小文件路徑
     * @return 分片好的文件信息列表
     * @throws IOException 讀取異常拋出
     */
    private static List<SplitFileDescriptor>
                splitBigFileAndSubmitSortUseBufferReader(String filePath,
                                                         String tmpDir) throws IOException {
        int perFileLines = 20_0000;
        List<SplitFileDescriptor> splitFiles = new ArrayList<>();
        String lineData;
        SplitFileDescriptor splitFile = null;
        AtomicLong lineNumberCounter = new AtomicLong(0);
        // 使用bufferdReader讀取文件分片
        try(FileReader reader = new FileReader(new File(filePath))) {
            BufferedReader bufferedReader = new BufferedReader(reader);
            while ((lineData = bufferedReader.readLine()) != null) {
                if(lineNumberCounter.get() % perFileLines == 0) {
                    submitSortTask(splitFile);
                    splitFile = rolloverSplitFile(splitFile,
                            lineNumberCounter.get(), perFileLines, tmpDir);
                    splitFiles.add(splitFile);
                }
                writeLine(splitFile, lineData, lineNumberCounter);
            }
        }
        // 提交最后一個分片文件排序
        submitSortTask(splitFile);
        return splitFiles;
    }

    /**
     * 讀取大文件分片並提交排序(使用apache io組件實現)
     *
     * @see #splitBigFileAndSubmitSortUseBufferReader(String, String)
     */
    private static List<SplitFileDescriptor>
                splitBigFileAndSubmitSortUseApacheIoItr(String filePath,
                                                        String tmpDir) throws IOException {
        int perFileLines = 20_0000;
        List<SplitFileDescriptor> splitFiles = new ArrayList<>();
        String lineData;
        SplitFileDescriptor splitFile = null;
        AtomicLong lineNumberCounter = new AtomicLong(0);
        // 使用apache io 類庫讀取文件分片, 比直接使用 bufferedReader 性能好
        LineIterator lineItr = FileUtils.lineIterator(new File(filePath));
        try {
            while (lineItr.hasNext()) {
                lineData = lineItr.nextLine();
                if(lineNumberCounter.get() % perFileLines == 0) {
                    submitSortTask(splitFile);
                    splitFile = rolloverSplitFile(splitFile,
                            lineNumberCounter.get(), perFileLines, tmpDir);
                    splitFiles.add(splitFile);
                }
                writeLine(splitFile, lineData, lineNumberCounter);
            }
        }
        finally {
            LineIterator.closeQuietly(lineItr);
        }
        // 提交最后一個分片文件排序
        submitSortTask(splitFile);
        return splitFiles;
    }

    /**
     * 排序指定文件
     *
     * @param splitFile 要排序的分片文件
     * @return 排序后的文件位置(可為原地排序)
     * @throws IOException 讀取寫入異常時拋出
     */
    private static String sortSplitFile(SplitFileDescriptor splitFile) throws IOException {
        String originalFilePath = splitFile.getFullFilePath();
        List<String> lines = splitFile.readLineDataBuffer();
        if(lines == null) {
            lines = FileUtils.readLines(
                        new File(originalFilePath), "utf-8");
        }
        lines.sort(String::compareTo);
        FileUtils.writeLines(new File(originalFilePath), lines);
        return originalFilePath;
    }

    /**
     * 滾動生成一個新的分片文件
     *
     * @param lastFile 上一個輸出的分片文件
     * @param currentLineNum 當前總記錄行數
     * @param perFileLines 單文件可容納行數
     * @param tmpDir 存放臨時文件的目錄(分片文件)
     * @return 分片文件的信息
     * @throws IOException 文件打開異常拋出
     */
    private static SplitFileDescriptor rolloverSplitFile(SplitFileDescriptor lastFile,
                                                         long currentLineNum,
                                                         int perFileLines,
                                                         String tmpDir) throws IOException {
        if(lastFile != null) {
            lastFile.close();
        }
        int splitFileNo = (int) (currentLineNum / perFileLines);
        String formattedFileName = String.format("sp_%04d", splitFileNo);
        return SplitFileDescriptor.newSplit(tmpDir, formattedFileName);
    }

    /**
     * 提交排序任務
     *
     * @param splitFile 單個小分片文件實例
     */
    private static void submitSortTask(SplitFileDescriptor splitFile) {
        if(splitFile == null) {
            return;
        }
        Future<?> sortFuture = sortSplitThreadPool.submit(() -> {
            try {
                sortSplitFile(splitFile);
            }
            catch (IOException e) {
                log.error("排序單文件時發生了異常" + splitFile.getFullFilePath(), e);
            }
        });
        splitFile.setFuture(sortFuture);
    }

    /**
     * 合並有序文件
     *
     * @param splitFilePathList 子分片文件列表
     * @param outputPath 結果文件存放路徑
     * @throws IOException 讀寫異常時拋出
     */
    public static long mergeSortedFileUseBufferReader(List<String> splitFilePathList,
                                       String outputPath) throws IOException {
        List<BufferedReader> bufferedReaderList
                = new ArrayList<>(splitFilePathList.size());
        splitFilePathList.forEach(r -> {
            FileReader reader = null;
            try {
                reader = new FileReader(new File(r));
                BufferedReader buffFd = new BufferedReader(reader);
                bufferedReaderList.add(buffFd);
            }
            catch (FileNotFoundException e) {
                log.error("文件讀取異常", e);
            }
        });
        String[] onlineDataShards = new String[bufferedReaderList.size()];
        int i = 0;
        for ( ; i < bufferedReaderList.size(); i++ ) {
            BufferedReader reader = bufferedReaderList.get(i);
            onlineDataShards[i] = reader.readLine();
        }
        String lastLineData = null;
        AtomicLong lineNumCounter = new AtomicLong(0);
        try(OutputStream targetOutput = FileUtils.openOutputStream(new File(outputPath))) {
            while (true) {
                int minIndex = 0;
                for (int j = 1; j < onlineDataShards.length; j++) {
                    // 后一文件已被迭代完成
                    if(onlineDataShards[j] == null) {
                        continue;
                    }
                    // 最小的文件已被迭代完成
                    if(onlineDataShards[minIndex] == null) {
                        minIndex = j;
                        continue;
                    }
                    if (onlineDataShards[j].compareTo(onlineDataShards[minIndex]) < 0) {
                        minIndex = j;
                    }
                }
                // 所有文件都已迭代完成
                if(onlineDataShards[minIndex] == null) {
                    break;
                }
                String minData = onlineDataShards[minIndex];
                // 去重
                if(!minData.equals(lastLineData)) {
                    writeLine(targetOutput, minData, lineNumCounter);
                    lastLineData = minData;
                }
                // 迭代下一行
                onlineDataShards[minIndex]
                        = bufferedReaderList.get(minIndex).readLine();
            }
        }
        for (BufferedReader reader : bufferedReaderList) {
            reader.close();
        }
        return lineNumCounter.get();
    }

    /**
     * 合並有序文件(使用 apache-io 的 lineIterator)
     *
     * @see #mergeSortedFileUseBufferReader(List, String)
     */
    public static long mergeSortedFileUseApacheIoItr(List<String> splitFilePathList,
                                                   String outputPath) throws IOException {
        List<LineIterator> bufferedReaderList
                = new ArrayList<>(splitFilePathList.size());
        splitFilePathList.forEach(r -> {
            try {
                bufferedReaderList.add(
                        FileUtils.lineIterator(new File(r)));
            }
            catch (IOException e) {
                log.error("文件讀取異常", e);
            }
        });
        String[] onlineDataShards = new String[bufferedReaderList.size()];
        int i = 0;
        for ( ; i < bufferedReaderList.size(); i++ ) {
            LineIterator reader = bufferedReaderList.get(i);
            onlineDataShards[i] = reader.nextLine();
        }
        log.info("准備merge文件個數:{}", bufferedReaderList.size());
        String lastLineData = null;
        int lastLineFd = -1;
        // 第二大的文件,用於二次快速比較大小
        int lastSecondBigLineFd = -1;
        AtomicLong lineNumCounter = new AtomicLong(0);
        try(OutputStream targetOutput = FileUtils.openOutputStream(new File(outputPath))) {
            while (true) {
                int minIndex = 0;
                String lastSecondBigLineData = lastSecondBigLineFd == -1
                                                    ? null
                                                    : onlineDataShards[lastSecondBigLineFd];
                // 比上一次第二小的值還小,則就是當前的最小值沒錯了
                // 第二小的值不那么好找,預留,待后續再完善吧
                String newReadLineData = lastLineFd == -1 ? null : onlineDataShards[lastLineFd];
                if(newReadLineData != null &&
                        (newReadLineData.equals(lastLineData)
                            || (lastSecondBigLineData != null
                                && newReadLineData.compareTo(lastSecondBigLineData) <= 0))) {
                    minIndex = lastLineFd;
                }
                else if (lastSecondBigLineData != null) {
                    minIndex = lastSecondBigLineFd;
                    lastSecondBigLineFd = -1;
                }
                else {
                    // 重新搜索最小值對應文件
                    List<Integer> swappedFds = new ArrayList<>();
                    for (int j = 1; j < onlineDataShards.length; j++) {
                        // 后一文件已被迭代完成
                        String curShardLineData = onlineDataShards[j];
                        if(curShardLineData == null) {
                            continue;
                        }
                        // 最小的文件已被迭代完成
                        if(onlineDataShards[minIndex] == null) {
                            minIndex = j;
                            continue;
                        }
                        if (curShardLineData.compareTo(onlineDataShards[minIndex]) < 0) {
                            swappedFds.add(minIndex);
                            minIndex = j;
                        }
                        // 與上一次最小一樣,就不要再往下比較了,是最小沒錯
                        if(onlineDataShards[minIndex].equals(lastLineData)) {
                            break;
                        }

                    }
                }
                lastLineFd = minIndex;
                // 所有文件都已迭代完成
                if(onlineDataShards[minIndex] == null) {
                    break;
                }
                String minData = onlineDataShards[minIndex];
                // 去重
                if(!minData.equals(lastLineData)) {
                    writeLine(targetOutput, minData, lineNumCounter);
                    log.info("write lineData: " + minData);
                    lastLineData = minData;
                }
                // 迭代下一行
                LineIterator fdItr = bufferedReaderList.get(minIndex);
                if(fdItr.hasNext()) {
                    onlineDataShards[minIndex] = fdItr.nextLine();
                    continue;
                }
                onlineDataShards[minIndex] = null;
            }
        }
        int closedFileNum = 0;
        for (LineIterator reader : bufferedReaderList) {
            LineIterator.closeQuietly(reader);
            if(reader != null) {
                closedFileNum++;
            }
        }
        log.info("關閉分片文件個數:{}", closedFileNum);
        return lineNumCounter.get();
    }

    /**
     * 寫單行數據到輸出流
     */
    private static void writeLine(SplitFileDescriptor splitFile,
                                  String lineData,
                                  AtomicLong lineNumCounter) throws IOException {
        if(splitFile == null) {
            throw new RuntimeException("分片文件為空");
        }
        splitFile.writeLine(lineData);
        lineNumCounter.incrementAndGet();
    }

    /**
     * 寫單行數據到輸出流
     */
    private static void writeLine(OutputStream outputStream,
                                  String lineData,
                                  AtomicLong lineNumCounter) throws IOException {
        outputStream.write(lineData.getBytes());
        outputStream.write("\n".getBytes());
        lineNumCounter.incrementAndGet();
    }

    /**
     * 分片文件描述
     */
    private static class SplitFileDescriptor {
        String subFileName;
        String fullFilePath;
        OutputStream outputStream;
        Future<?> future;

        /**
         * 用於存放緩沖數據
         */
        List<String> lineDataBuffer;

        public SplitFileDescriptor(String mainDir,
                                   String subFileName) throws IOException {
            this.subFileName = subFileName;
            if(!mainDir.endsWith("/")) {
                mainDir += "/";
            }
            this.fullFilePath = mainDir + subFileName;
        }

        public static SplitFileDescriptor newSplit(String mainDir,
                                                   String subFileName) throws IOException {
            return new SplitFileDescriptor(mainDir, subFileName);
        }

        public void close() throws IOException {
            outputStream.close();
        }

        public void setFuture(Future<?> future) {
            this.future = future;
        }

        public String getFullFilePath() {
            return fullFilePath;
        }

        public void writeLine(String lineData) throws IOException {
            if(lineDataBuffer == null) {
                lineDataBuffer = new ArrayList<>();
            }
            lineDataBuffer.add(lineData);
//            outputStream.write(lineData.getBytes());
//            outputStream.write("\n".getBytes());
        }

        public Future<?> getFuture() {
            return future;
        }

        /**
         * 讀緩沖數據(單向不可逆)
         */
        public List<String> readLineDataBuffer() {
            List<String> buf = lineDataBuffer;
            resetBuffer();
            return buf;
        }

        /**
         * 重置緩沖, 避免內存溢出
         */
        void resetBuffer() {
            lineDataBuffer = null;
        }
    }

    /**
     * 簡單命命名線程生成工廠類
     */
    private static class NamedThreadFactory implements ThreadFactory {
        private AtomicInteger counter
                = new AtomicInteger(0);
        private String threadNamePrefix;
        public NamedThreadFactory(String prefix) {
            this.threadNamePrefix = prefix;
        }
        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r,
                    threadNamePrefix + counter.incrementAndGet());
        }
    }
}

  具體就是,使用apache的io包進行文件讀取(底層基於nio),另外,將大文件分片的結果優先寫到分片緩沖中,直接丟入排序線程,排序非常快。所以當分片完成時,基本上排序也就完成了。而歸並的過程,則是一個插入排序的過程,消耗也主要文件讀取io,使用一個lastLineData作為去重的實現,在內容重復度很高時,該操作非常有用。上面的優化,基本可以提供4倍左右的性能,還是不錯的。就是會存在一定風險:如當io足夠快時,很可能排序線程就跟不上,從而導致內存撐爆了;另外如果外部請求排序的任務較多時,也會導致內容耗光,這都是極其危險的。

  單元測試如下:

@Slf4j
public class BigFileSortHelperTest {

    @Test
    public void testSortFile1() throws IOException {
        long startTime = System.currentTimeMillis();
        log.info("start sort process.");
        String sortedFilePath = BigFileSortHelper.sortBigFile("D:\\cygwin64\\home\\Administrator\\1-merged.txt");
        log.info("sortedFilePath:" + sortedFilePath);
        log.info("costTime: " + (System.currentTimeMillis() - startTime) + "ms");
    }

    @Test
    public void testMergeFunc() throws IOException {
        long startTime = System.currentTimeMillis();
        String splitFilePath = "/tmp/sort_1602922717865";
        Collection<File> files = FileUtils.listFiles(new File(splitFilePath),
                                    TrueFileFilter.INSTANCE, TrueFileFilter.INSTANCE);
        List<String> splitFileList = files.stream().map(File::getPath).collect(Collectors.toList());
        long totalLines = BigFileSortHelper.mergeSortedFileUseApacheIoItr(
                                splitFileList, "/tmp/merge0.txt");
        log.info("merge costTime:{}, totalLines:{} ",
                System.currentTimeMillis() - startTime, totalLines);
        Assert.assertEquals("去重后的文件行數不對", 33, totalLines);
    }
}

  以上實現,還是非常棒的,供諸君參考。

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM