100G大文件單機處理(在100G大文件中找到第一個不重復的字符串,16G內存限制)


Large-File-Processing

問題:

有一個 100GB 的文件,里面內容是文本
要求:

  • 找出第一個不重復的詞
  • 只允許掃一遍原文件
  • 盡量少的 IO
  • 內存限制 16G
  • 隨機字符串,每行一個字符串 (長度范圍從 0-100)。

思路:

  1. 100G字符串,0-100字節隨機,最后換行占兩個字節
  2. 每行是一個byte數組,長度1-100不等(不加上換行符),一個字節8位,所以共有2^800種組合
  3. 所以整個文件不可能直接存到內存中,最壞情況,100G中,每個字符串都不同,第一個字符串就是要尋找的目標字符串
  4. 100G 數據最壞情況下有多少行? 決定用 int 還是 long 表示字符串出現頻率和字符串第一次出現的位置,假設每行一個字符,行數100 * 1024 * 1024 * 1024 / 3 >2^31-1,超出int范圍,若都是相同字符串,則字符串頻率也會超出,java中 long最大為2^63 -1 > 100 * 1024 * 1024 *1024, 所以用long統計足夠。
  5. 按照最壞情況,設要把大文件拆分成 x 份,每份文件中要記錄每行的字符串內容以及在源文件中第一次出現的位置,需要一個long數據轉化成字符串 根據最大值 1024 * 1024 * 1024 * 00/3=35791394133.33333, 需要11個字節,再加一個分隔符占一個字節,總共需要12個字節,此文件讀到內存中要把原來的String類型的統計字符串位置索引的內容轉化成long,每行最多擴大8(long字節數)-1(字符串位置索引最小字節)=7個字節,(其實擴大不了這么多,因為有的是從12個字節減小到8個字節)
  6. 源文件切割份數計算方式如下圖
    image
  7. 接下來就是如何切割使得盡量均勻達到我們設置的內存,最重要的一點是相同字符串要在同一個文件中,這是保證分布運算的關鍵,所以就要用到Hash函數,相同字符串的hash函數值是相同的(我用的java自帶的計算String的hashcode)。但是由於2^800 是一個很大的種類數,還是存在極特殊情況使得小文件分布不均勻,遍歷文件對一次分割文件變大的文件按照所占內存大小重新分割。
  8. 尋找被切割后的盡量少的文件數是為了盡量減少IO
  9. 切割完成后的對每個文件進行處理的算法就比較簡單了,讀文件把其存到內存中,統計每個字符串其出現頻率和第一次出現的位置。每個文件保存一個結果,即頻率為1且最早出現的字符串信息,以后遍歷的每個文件中若有頻率為1且更早索引位置的,將原有結果替換。若文本中無結果,返回字符串"全文無非重復字符串"
  10. 維護一張hashmap在讀取的時候統計詞頻,在內存范圍內,若有詞超過兩個,就不讀入小文件,控制哈希表在14G范圍內,多了就不增加寫入小文件。
  11. 維護一張bitmap,對每個字符串構建hash函數,14G*8=112G的數值范圍已經確保bitmap足夠大,100G字符串平均長度50,只有2G的種類數,112G種對比2G種,不同字符串hash沖突的概率極小,極大概率保證字符串hash值不沖突,【然后從尾到頭讀文件】,字符串計算hash值,查bitmap表,若為0,則置為1,加入候選解,若為1,則刪除候選解。再為0變1,則替換候選解。這種效率很高,查找解的速度就是磁盤讀取速度。(但是有錯誤概率,因為沒有維護候選解,從頭到尾只有一個候選解字符串,另外就是只能從大概率上保證不同字符串的hash值不沖突)
  12. 終極方案,11和12同時進行,同時維護一張bitmap和一張hashmap,hashmap可以作為bitmap的候選解hash表。

可改進的地方

  1. 算法可以優化查詢速度,維護一顆樹或者堆
  2. 讀寫內容時,buffer內存效率值也可以改進,目前根據經驗設置為1M
  3. 考慮雙線程進行讀寫操作,一邊讀一遍處理數據,這個提升了改進buffer的空間,也能提升整個的查詢效率。

使用和運行

新建Project將3個java文件拷入即可,記得修改首行包名

主程序函數入口:FindFisrtX.main

創建文本測試用例main函數入口:FileIO.main


程序github地址


主程序:FindFisrtX.java

import java.io.*;
import java.util.HashMap;
import java.util.Map;


/**
 * 定義字符串信息
 */
class WordsInfo {
    String word;
    long firstApperIndex;
    long frequency;

    public WordsInfo(long firstApperIndex, long frequency) {
        this.firstApperIndex = firstApperIndex;
        this.frequency = frequency;
    }

    public WordsInfo(long frequency) {
        this.frequency = frequency;
    }

    public WordsInfo(String word, long firstApperIndex, long frequency) {
        this.word = word;
        this.firstApperIndex = firstApperIndex;
        this.frequency = frequency;
    }
}

public class FindFirstX {
    /**
     * 主函數入口
     *
     * @param args
     * @throws IOException
     */
    public static void main(String[] args) throws IOException {
        long startTime = System.currentTimeMillis();
        int num_files = 5;// 被分割文件數量
        String sourceFilePath = "G:/wordTest710.txt"; // 100G大文件路徑
        // String sourceFilePath = "D:/面試/pingCAP/test.txt"; // 100G大文件路徑
        // String sourceFilePath = "D:/面試/pingCAP/test.txt"; // 100G大文件路徑
        FileIO.delAllFile("G:/PingCAP");
        String desFolderPath = "G:/PingCAP"; //切割后的小文件存放路徑
        String fileName = "wordShow"; // 小目標文件標准名稱
        String[] strTemp; // 存放字符串與出現位置的數組
        String result = "全文無非重復字符串"; // 保存最終結果
        WordsInfo wordsInfo; //存放每個小文件中最有可能的目標解信息
        Long firstApperIndex = Long.MAX_VALUE;
        FileInputStream inputStream = null;
        BufferedInputStream bis = null;
        BufferedReader reader = null;
        FileIO.cutLargeFile(num_files, sourceFilePath, desFolderPath, fileName, 1024 * 1024 * 40); //按照內存限制切割小文件
        File dirFile = new File(desFolderPath);
        String[] fileList = dirFile.list();
        for (String s : fileList) {
            System.out.println(s);
        }

        for (String fileName_re : fileList) {

            Map<String, WordsInfo> wordsMap = new HashMap<>(); //存單詞的容器
            try {
                inputStream = new FileInputStream(desFolderPath + "/" + fileName_re);
                bis = new BufferedInputStream(inputStream); //帶緩沖數組的輸入流
                reader = new BufferedReader(new InputStreamReader(bis, "utf-8"), 1 * 1024 * 1024);
                String line;
                while ((line = reader.readLine()) != null) {

                    strTemp = line.trim().split("分");
                    KeepWordsToMap(wordsMap, strTemp[0], Long.valueOf(strTemp[1])); // 保存到容器
                }

            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (inputStream != null) {
                    inputStream.close();
                }

                if (reader != null) {
                    reader.close();
                }
                if (bis != null) {
                    bis.close();
                }
            }
            wordsInfo = FindFirstSingleX(wordsMap);
            if (wordsInfo.frequency == 1 && wordsInfo.firstApperIndex < firstApperIndex) {
                firstApperIndex = wordsInfo.firstApperIndex;
                result = wordsInfo.word;
            }
        }
        System.out.println("第一個不重復的字符串為: " + result); // 輸出結果
        long endTime = System.currentTimeMillis();
        System.out.println("程序總運行時間:" + (endTime - startTime) + "ms"); //輸出程序運行時間
    }

    /**
     * method :把每個字符串存進當前map,並記錄其,第一次出現的位置以及出現頻率
     *
     * @param wordsMap
     * @param s
     * @param countIndex
     */
    public static void KeepWordsToMap(Map<String, WordsInfo> wordsMap, String s, Long countIndex) {

        if (wordsMap.get(s) != null)
            wordsMap.replace(s.trim(), new WordsInfo(wordsMap.get(s.trim()).firstApperIndex, wordsMap.get(s.trim()).frequency + 1L));
        else wordsMap.put(s, new WordsInfo(countIndex, 1L));
    }

    /**
     * method:遍歷map,得到第一次出現未重復的解,若無返回默認解
     *
     * @param wordsMap
     * @return
     */
    public static WordsInfo FindFirstSingleX(Map<String, WordsInfo> wordsMap) {
        String result = "";
        long minFirstApperIndex = Long.MAX_VALUE;
        long frequency = 2;
        WordsInfo wordsInfo = new WordsInfo(result, minFirstApperIndex, frequency);
        for (String s : wordsMap.keySet()) {
            if (wordsMap.get(s).frequency == 1 && wordsMap.get(s).firstApperIndex < minFirstApperIndex) {
                wordsInfo.word = s;
                wordsInfo.firstApperIndex = wordsMap.get(s).firstApperIndex;
                minFirstApperIndex = wordsMap.get(s).firstApperIndex;
                wordsInfo.frequency = 1;
            }
        }

        return wordsInfo;
    }

}

FileIO 操作文件的一些方法,以及切割小文件的方法

import java.io.*;
import java.util.HashMap;

public class FileIO {
    // 用於創建測試用例
    public static void main(String[] args) throws IOException {
        long startTime = System.currentTimeMillis();    //獲取開始時間
        float a = 0.15F; // 隨機輸入一個不重復數據
        String str = "";
        File f = new File("G:/wordTest710.txt");
        FileOutputStream fop = new FileOutputStream(f, false);
        OutputStreamWriter writer = new OutputStreamWriter(fop, "UTF-8");
        BufferedWriter bw = new BufferedWriter(writer, 1 * 1024 * 1024);
        for (long i = 0L; i < 800000L; i++) {
            if (a < Math.random()) {
                bw.append("TWODOG");
                bw.append("\r\n");
                a = 2.0F;
            }
            str = Utils.creatWord(1, 100);
            bw.append(str);
            bw.append("\r\n");
            bw.append(str);
            bw.append("\r\n");
            bw.append(str);
            bw.append("\r\n");
            bw.append(str);
            bw.append("\r\n");
            bw.append(str);
            bw.append("\r\n");
            bw.append(str);
            bw.append("\r\n");
        }
        bw.append("xiaoxinniubi");
        bw.append("\r\n");

        writer.flush();
        bw.flush();
        fop.flush();
        writer.close();
        bw.close();
        fop.close();

        System.out.println("完成");
        long endTime = System.currentTimeMillis();    //獲取結束時間
        System.out.println("創建測試用例程序運行時間:" + (endTime - startTime) + "ms");    //輸出程序運行時間

    }

    /**
     * 方法:把字符串寫入文件
     *
     * @param line
     * @param ch
     * @param Index :大文件里出現位置的索引
     */
    public static void WriteToFile(String line, char ch, Long Index, BufferedWriter bw) throws IOException {

        bw.append(line + ch + Index + "\r\n");
    }

    public static void WriteToFile(String line, BufferedWriter bw) throws IOException {

        bw.append(line + "\r\n");
    }

    /**
     * 方法: 把大文件切割成小文件
     *
     * @param num_file       分割后的小文件數量
     * @param sourceFilePath 被分割源文件路徑
     * @param desFolderPath  存放分割后目標文件夾路徑
     * @param fileName       小目標文件標准名稱
     * @param smallFileMem   小文件內存限制
     * @throws IOException
     */
    public static void cutLargeFile(int num_file, String sourceFilePath, String desFolderPath, String fileName, long smallFileMem) throws IOException {
        long hashMapMem = 0L;// 定義讀取文件時候存儲的hashmap空間
        final long tempMapMemLimit = 1024L * 1024L * 1024L * 14L;
        HashMap<String, Long> tempHashMap = new HashMap<>(); //維護一個減少小文件寫入的hash表
        long startTime = System.currentTimeMillis();    //獲取開始時間
        FileInputStream inputStream = null;
        BufferedInputStream bis = null;
        BufferedReader reader = null;
        // int num_file = 26;
        File[] files = new File[num_file];
        FileOutputStream[] fops = new FileOutputStream[num_file];
        OutputStreamWriter[] writers = new OutputStreamWriter[num_file];
        BufferedWriter[] bws = new BufferedWriter[num_file];
        for (int i = 0; i < num_file; i++) {
            files[i] = new File(desFolderPath + "/" + fileName + i + ".txt");
            fops[i] = new FileOutputStream(files[i], true);
            writers[i] = new OutputStreamWriter(fops[i], "UTF-8");
            bws[i] = new BufferedWriter(writers[i], 1 * 1024 * 1024);
        }

        try {
            Long index = 0L; //統計字符串在源文件中的位置
            inputStream = new FileInputStream(sourceFilePath);
            bis = new BufferedInputStream(inputStream); //帶緩沖數組的輸入流
            reader = new BufferedReader(new InputStreamReader(bis, "utf-8"), 1 * 1024 * 1024);
            String line;

            while ((line = reader.readLine()) != null) {
                String trueLine = line.trim();

              /*  System.out.println("tempHashMap.get(trueLine)  " + tempHashMap.get(trueLine));
                System.out.println("hashMapMem < (long)(1024 * 1024 * 1024 * 14) " + (hashMapMem < tempMapMemLimit));
                System.out.println("真假: " + tempHashMap.get(trueLine) == null && hashMapMem < tempMapMemLimit);*/
                if (tempHashMap.get(trueLine) == null && hashMapMem < tempMapMemLimit) {
                    tempHashMap.put(trueLine, 1L);
                    hashMapMem += (8L + 4L + (long) trueLine.length()); // hashcode占4字節,頻率占8字節,字符串占 trueLine.length() 字節
                } else if (tempHashMap.get(trueLine) != null && hashMapMem < tempMapMemLimit) {
                    tempHashMap.put(line.trim(), tempHashMap.get(trueLine) + 1L);
                    hashMapMem += (8L + 4L + (long) trueLine.length());
                }

                if (tempHashMap.get(trueLine) < 2 || (tempHashMap.get(trueLine) == null && hashMapMem > (long) (1024 * 1024 * 1024 * 14))) {
                    int type = trueLine.hashCode() % num_file > 0 ? trueLine.hashCode() % num_file : -trueLine.hashCode() % num_file;
                    //System.out.println("type: " + type);
                    // System.out.println("line.trim().hashCode: " + line.trim().hashCode());
                    FileIO.WriteToFile(trueLine, '分', index, bws[type]);
                    index++;
                }

            }
            for (int i = 0; i < num_file; i++) {
                fops[i].flush();
                writers[i].flush();
                bws[i].flush();
                fops[i].close();
                writers[i].close();
                bws[i].close();
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (inputStream != null) {
                inputStream.close();
            }

            if (reader != null) {
                reader.close();
            }
            if (bis != null) {
                bis.close();
            }
        }
        for (File file : files) {
            FileInputStream inputStream_re = null;
            BufferedInputStream bis_re = null;
            BufferedReader reader_re = null;
            System.out.println(file.length());
            if (file.length() > smallFileMem) {
                int copies = (int) (Math.ceil((double) file.length()) / (double) smallFileMem); // 分成copies份
                //int copies = 2; // 分成copies份
                File[] files_re = new File[copies];
                FileOutputStream[] fops_re = new FileOutputStream[copies];
                OutputStreamWriter[] writers_re = new OutputStreamWriter[copies];
                BufferedWriter[] bws_re = new BufferedWriter[copies];
                for (int i = 0; i < copies; i++) {
                    int fileIndex = i + num_file;
                    files_re[i] = new File(desFolderPath + "/" + fileName + (fileIndex) + ".txt");
                    fops_re[i] = new FileOutputStream(files_re[i], true);
                    writers_re[i] = new OutputStreamWriter(fops_re[i], "UTF-8");
                    bws_re[i] = new BufferedWriter(writers_re[i], 1 * 1024 * 1024);
                }
                try {
                    inputStream_re = new FileInputStream(file.getAbsoluteFile());
                    bis_re = new BufferedInputStream(inputStream_re); //帶緩沖數組的輸入流
                    reader_re = new BufferedReader(new InputStreamReader(bis_re, "utf-8"), 1 * 1024 * 1024);
                    String line;
                    String[] trueStr;//文本中真實字符串

                    while ((line = reader_re.readLine()) != null) {
                        trueStr = line.trim().split("分");
                        int type = Utils.APHash(trueStr[0]) % copies > 0 ? Utils.APHash(trueStr[0]) % copies : -Utils.APHash(trueStr[0]) % copies;
                        FileIO.WriteToFile(line.trim(), bws_re[type]);

                    }
                    for (int i = 0; i < copies; i++) {
                        fops_re[i].flush();
                        writers_re[i].flush();
                        bws_re[i].flush();
                        fops_re[i].close();
                        writers_re[i].close();
                        bws_re[i].close();
                    }

                } catch (IOException e) {
                    e.printStackTrace();
                } finally {
                    if (inputStream_re != null) {
                        inputStream_re.close();
                    }

                    if (reader_re != null) {
                        reader_re.close();
                    }
                    if (bis_re != null) {
                        bis_re.close();
                    }
                }
                num_file = num_file + copies;
                file.delete();
            }
        }

        long endTime = System.currentTimeMillis();
        System.out.println("大文件分成小文件程序運行時間:" + (endTime - startTime) + "ms");

    }


    /**
     * 清空文件夾
     *
     * @param folderPath
     */
    public static void delFolder(String folderPath) {
        try {
            delAllFile(folderPath); //刪除完里面所有內容
            String filePath = folderPath;
            filePath = filePath.toString();
            java.io.File myFilePath = new java.io.File(filePath);
            myFilePath.delete(); //刪除空文件夾
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    /**
     * 刪除指定文件夾下所有文件
     *
     * @param path 文件夾完整絕對路徑
     * @return
     */
    public static boolean delAllFile(String path) {
        boolean flag = false;
        File file = new File(path);
        if (!file.exists()) {
            return flag;
        }
        if (!file.isDirectory()) {
            return flag;
        }
        String[] tempList = file.list();
        File temp = null;
        for (int i = 0; i < tempList.length; i++) {
            if (path.endsWith(File.separator)) {
                temp = new File(path + tempList[i]);
            } else {
                temp = new File(path + File.separator + tempList[i]);
            }
            if (temp.isFile()) {
                temp.delete();
            }
            if (temp.isDirectory()) {
                delAllFile(path + "/" + tempList[i]);//先刪除文件夾里面的文件
                delFolder(path + "/" + tempList[i]);//再刪除空文件夾
                flag = true;
            }
        }
        return flag;
    }
}


其它一些小工具常見hash函數等 Utils.java

package xin.twodog.PingCAP;

import java.io.File;

public class Utils {

    /**
     * 隨機生成單詞
     *
     * @param min 最小長度
     * @param max 最大長度
     * @return
     */

    public static String creatWord(int min, int max) {
        int count = (int) (Math.random() * (max - min + 1)) + min;
        String str = "";
        for (int i = 0; i < count; i++) {
            str += (char) ((int) (Math.random() * 26) + 'a');
        }
        return str;
    }

    /**
     * 返回文件內存大小
     *
     * @param filePath
     * @return
     * @throws Exception
     */
    public static Long getFileMem(String filePath) {
        File localFile = new File(filePath);
        return localFile.length();
    }


    /**
     * 刪除文件
     *
     * @param filePath
     */
    public static void delFile(String filePath) {
        File localFile = new File(filePath);
        localFile.delete();
    }


    /**
     * DEKHash算法
     *
     * @param str
     * @return
     */
    public static int DEKHash(String str) {
        int hash = str.length();
        for (int i = 0; i < str.length(); i++) {
            hash = ((hash << 5) ^ (hash >> 27)) ^ str.charAt(i);
        }
        return (hash & 0x7FFFFFFF);
    }

    /**
     * APHash算法
     *
     * @param str
     * @return
     */
    public static int APHash(String str) {
        int hash = 0;
        for (int i = 0; i < str.length(); i++) {
            hash ^= ((i & 1) == 0) ? ((hash << 7) ^ str.charAt(i) ^ (hash >> 3)) :
                    (~((hash << 11) ^ str.charAt(i) ^ (hash >> 5)));
        }
        return hash;
    }

    /**
     * 改進的32位FNV算法1
     *
     * @param data 字符串
     * @param data
     * @return int值
     */
    public static int FNVHash1(String data) {
        final int p = 16777619;
        int hash = (int) 2166136261L;
        for (int i = 0; i < data.length(); i++)
            hash = (hash ^ data.charAt(i)) * p;
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        return hash;
    }

    /**
     * JS hash 算法
     *
     * @param str
     * @return
     */
    public static int JSHash(String str) {
        int hash = 1315423911;
        for (int i = 0; i < str.length(); i++) {
            hash ^= ((hash << 5) + str.charAt(i) + (hash >> 2));
        }
        return (hash & 0x7FFFFFFF);
    }
}



免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM