超大文件(1TB)統計訪問次數最多的來源IP及訪問次數


題目解讀

1. 文件格式:訪問時間,來源IP,響應結果,響應耗時

2. 文件大小:超大規模,TB數量級

解題思路

首先,數據量過大,通過內存計算肯定是不可行的。

考慮采用分治,將大文件切割成小文件,再對小文件分析,找出訪問次數最多的,此時將問題轉化為:切割小文件算法

具體思路如下:

將1T的文件按照IP的高8位(代碼是按照高8位實現的,ipv4的高位地址不均勻,按照低8位>比較合理)分隔成2^8份。
每一份寫入到文件名為"tmp_{高8位地址}"的文件中,文件中的數據為低24位的整型字符串(踢出了高8位,以方便用int類型表示)。
 開始順序處理每個"tmp_{高8位地址}"文件:
 1. 申請一塊2^24大小的int內存塊(arr)
 2. 初始化內存塊為0值
 3. 讀取每一行數據,轉換成整型值后做為下標i,將arr[i]++,表示出現了一次
 4. 重復3操作,一直到處理完整個文件為止
 5. 遍歷arr,找出最大值的下標maxCountIndex
 6. 遍歷arr,找出所有與最大值相同的值,將ip和出現次數寫入到maxIpCountMap(用於存儲每個文件中出現次數最多的IP地址及出現次數)中
 7. 重復2-6步驟,一直到所有切割文件處理完畢
 8. 遍歷一遍maxIpCountMap,找出IP最大次數max(所有IP中,出現最多的次數)
 9. 重新遍歷一遍maxIpCountMap,將所有出現次數等於max的IP和次數打印出來

上代碼

import java.io.*;
import java.net.URISyntaxException;
import java.util.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * 所有用到的輔助數據:
 * 1. 2^8個切割文件
 * 2. 2^24個整數的數組,大小為64M
 * 3. 2^8個文件對象,用於加快查詢
 * 4. 用於存儲每個文件中最大值的HashMap(沒有考慮極端情況,如果1T的文件中所有IP出現次數相同,這個HashMap就太大了,方案得重新調整)
 *
 * Created by ronghantao on 2019/3/4.
 */
public class TopN {
    private static int MASK_TOP8 = 0XFF000000; //獲取高8位的掩碼
    private static int MAST_LEFT24 = 0X00FFFFFF; //獲取低24位掩碼
    //ip地址的pattern
    private static String PATTERN_FILTER = "(2(5[0-5]{1}|[0-4]\\d{1})|[0-1]?\\d{1,2})(\\.(2(5[0-5]{1}|[0-4]\\d{1})|[0-1]?\\d{1,2})){3}";

    //用於存儲文件,再次讀取的時候,用這里的數據
    private File[] files = new File[0xFF];

    //每個分隔文件中,數值最大的存儲於此
    private Map<String, Integer> maxIpCountMap = new HashMap<>();

    private static Pattern r = Pattern.compile(PATTERN_FILTER);

    /**
     * 串接整個流程
     */
    public void printTopCountList(String fileName) throws IOException, URISyntaxException {
        //開始切分文件
        this.splitFile(fileName);
        //針對每個文件,進行計算了,找出每個文件中數量最大的IP
        this.countTopN();
        //開始找出數量最多的ip列表
        if (this.maxIpCountMap.size() == 0) {
            //空的
            System.out.println("empty list.");
            return;
        }
        int max = 0;
        for (String k : this.maxIpCountMap.keySet()) {
            if (this.maxIpCountMap.get(k) > max) {
                max = this.maxIpCountMap.get(k);
            }
        }
        //開始打印所有的最大值列表了
        for (String k : this.maxIpCountMap.keySet()) {
            if (this.maxIpCountMap.get(k) == max) {
                System.out.println("ip: " + k + ", count: " + this.maxIpCountMap.get(k));
            }
        }
    }

    /**
     * 計算top1的IP地址
     */
    public void countTopN() throws IOException {
        int[] arr = new int[0x00FFFFFF];
        for (int i = 0; i < 0xFF; i++) {
            //先初始化buffer,都初始化成0
            for (int j = 0; j < arr.length; j++) {
                arr[j] = 0;
            }
            //開始處理文件
            File toProcessFile = this.files[i];
            BufferedReader r = new BufferedReader(new InputStreamReader(new FileInputStream(toProcessFile)));
            try {
                String line;
                boolean flag = false; //標識文件是否為空
                while ((line = r.readLine()) != null) {
                    //讀到數據了,開始處理
                    arr[Integer.valueOf(line)]++; //直接將對應的下標+1即可
                    flag = true;//有數據了
                }
                if (flag == false){
                    continue; //沒有讀到數據,繼續處理其他文件
                }
                //處理完后,找到本文件中最大的值的下標
                int maxCountIndex = 0;
                for (int j = 1; j < arr.length; j++) {
                    if (arr[j] > arr[maxCountIndex]) {
                        maxCountIndex = j;
                    }
                }
                //然后找出所有與此最大值相同的ip列表,寫入到treeMap中
                for (int j = 0; j < arr.length; j++) {
                    if (arr[j] == arr[maxCountIndex]) {
                        //寫入到treeMap,需要做IP轉換
                        this.maxIpCountMap.put(this.longToIp(((long) j) + (((long) (i + 1) << 24))), arr[j]);
                    }
                }
            } finally {
                r.close();
            }
        }
    }

    public void splitFile(String fileName) throws URISyntaxException, IOException {
        //todo 文件合法性校驗
        InputStreamReader inputStreamReader = new InputStreamReader(TopN.class.getClassLoader().getResourceAsStream(fileName));
        BufferedReader reader = new BufferedReader(inputStreamReader);
        BufferedWriter[] writers = new BufferedWriter[0xFF]; //保存每個文件是在什么位置,使用writer,不需要每次都打開了
        for (int i = 0; i < 0xFF; i++) {
            File f = new File("tmp_" + Integer.toHexString(i+1).toUpperCase());
            writers[i] = new BufferedWriter(new OutputStreamWriter(new FileOutputStream(f)));
            this.files[i] = f;
        }
        try {
            String line;
            //一行一行的讀文件,然后找到第一個匹配的ip地址
            while ((line = reader.readLine()) != null) {
                Matcher m = r.matcher(line);
                if (!m.find()) {
                    System.out.println(line + " not match!");
                    continue;
                }
                String ipString = m.group();
                //將大文件按照高8位的規則進行分片,存儲到分片文件中
                //獲取IP的分片信息
                long ipLong = this.ipToLong(ipString);
                int fileIndex = this.getTop8Int(ipLong) - 1; //下標從0開始,需要-1操作
                BufferedWriter w = writers[fileIndex];
                //將ip剩余的24位整型值寫入到文件中
                w.write(String.valueOf(this.getLeft24Int(ipLong))+"\n");
            }
        } finally {
            //關閉所有文件
            for (int i = 0; i < 0xFF; i++) {
                if (writers[i] != null) {
                    try {
                        writers[i].close();
                    } catch (IOException e) {
                        //todo nothing
                    }
                }
            }
            if (reader != null) {
                reader.close();
            }
            if (inputStreamReader != null) {
                inputStreamReader.close();
            }
        }
    }

    public int getLeft24Int(long ip) {
        return (int) (ip & MAST_LEFT24);
    }

    public int getTop8Int(long ip) {
        return (int) ((ip & MASK_TOP8) >> 24);
    }

    public long ipToLong(String ipStr) {
        long result = 0;
        String[] ipAddressInArray = ipStr.split("\\.");
        for (int i = 3; i >= 0; i--) {
            long ip = Long.parseLong(ipAddressInArray[3 - i]);
            result |= ip << (i * 8);
        }
        return result;
    }

    /**
     * long轉換成ip地址格式
     *
     * @param ip ip的long表示
     * @return 點分隔ip字符串
     */
    public String longToIp(long ip) {
        return ((ip >> 24) & 0xFF) + "."
                + ((ip >> 16) & 0xFF) + "."
                + ((ip >> 8) & 0xFF) + "."
                + (ip & 0xFF);
    }
}

遺留問題

對於以上算法,如果1TB的文件中,訪問量最高的IP不多,不會出現問題,否則,也會占用較大的內存。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM