修改Flume-NG的hdfs sink解析時間戳源碼大幅提高寫入性能


轉自:http://www.cnblogs.com/lxf20061900/p/4014281.html

Flume-NG中的hdfs sink的路徑名(對應參數"hdfs.path",不允許為空)以及文件前綴(對應參數"hdfs.filePrefix")支持正則解析時間戳自動按時間創建目錄及文件前綴。

  在實際使用中發現Flume內置的基於正則的解析方式非常耗時,有非常大的提升空間。如果你不需要配置按時間戳解析時間,那這篇文章對你用處不大,hdfs sink對應的解析時間戳的代碼位於org.apache.flume.sink.hdfs.HDFSEventSink的process()方法中,涉及兩句代碼:  

1 // reconstruct the path name by substituting place holders
2         String realPath = BucketPath.escapeString(filePath, event.getHeaders(),
3             timeZone, needRounding, roundUnit, roundValue, useLocalTime);
4         String realName = BucketPath.escapeString(fileName, event.getHeaders(),
5           timeZone, needRounding, roundUnit, roundValue, useLocalTime);

  其中,realPath是正則解析時間戳之后的完整路徑名,filePath參數就是配置文件中的"hdfs.path";realName就是正則解析時間戳之后的文件名前綴,fileName參數就是配置文件中的"hdfs.filePrefix"。其他參數都相同,event.getHeaders()是一個Map里面有時間戳(可以通過interceptor、自定義、使用hdfs sink的useLocalTimeStamp參數三種方式來設置),其他參數是時區、是否四舍五入以及時間單位等。

  BucketPath.escapeString這個方法就是正則解析時間戳所在,具體代碼我們不再分析,現在我們編寫一個程序測試一下BucketPath.escapeString這個方法的性能,運行這個測試類要么在源碼中:

public class Test {public static void main(String[] args) {
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("timestamp", Long.toString(System.currentTimeMillis()));
        String filePath = "hdfs://xxxx.com:8020/data/flume/%Y-%m-%d";
        String fileName = "%H-%M";
        long start = System.currentTimeMillis();
        System.out.println("start time is:" + start);
        for (int i = 0; i < 2400000; i++) {
        String realPath = BucketPath.escapeString(filePath, headers, null, false, Calendar.SECOND, 1, false);
        String realName = BucketPath.escapeString(fileName, headers, null, false, Calendar.SECOND, 1, false);
        }
     long end = System.currentTimeMillis();
     System.out.println("end time is:"+ end + ".\nTotal time is:" + (end - start) + " ms.");
   }
}

  這個方法后面5個參數我們一般不需要用到,因此這里其實都設置成在實際中沒有影響的數值了。headers參數要有“timestamp”參數,我們這里循環處理240W個event,看看運行結果:

start time is:1412853253889
end time is:1412853278210.
Total time is:24321 ms.

  我靠,居然花了24s還多,尼瑪要知道哥目前白天的數據量也就是每秒4W個event,這還不是峰值呢。。。加上解析時間戳全量就扛不住了,怎么辦??

  能怎么辦啊?只能想辦法替換這個解析辦法了,於是,我就想到這樣了,看測試程序:

public class Test {

    private static SimpleDateFormat sdfYMD = null;
    private static SimpleDateFormat sdfHM = null;
    
    public static void main(String[] args) {
        
        sdfYMD = new SimpleDateFormat("yyyy-MM-dd");
        sdfHM = new SimpleDateFormat("HH-mm");
        HashMap<String, String> headers = new HashMap<String, String>();
        headers.put("timestamp", Long.toString(System.currentTimeMillis()));
        String filePath = "hdfs://dm056.tj.momo.com:8020/data/flume/%Y-%m-%d";
        String fileName = "%H-%M";
        long start = System.currentTimeMillis();
        System.out.println("start time is:" + start);
        for (int i = 0; i < 2400000; i++) {
            //String realPath = BucketPath.escapeString(filePath, headers, null, false, Calendar.SECOND, 1, false);
            //String realName = BucketPath.escapeString(fileName, headers, null, false, Calendar.SECOND, 1, false);
            
            String realPath = getTime("yyyy-MM-dd",Long.parseLong(headers.get("timestamp")));
            String realName = getTime("HH-mm",Long.parseLong(headers.get("timestamp")));
        }
        long end = System.currentTimeMillis();
        System.out.println("end time is:"+ end + ".\nTotal time is:" + (end - start) + " ms.");
    }

    public static String getTime(String format,long timestamp) {
        String time="";
        if(format.equals("HH-mm"))
            time=sdfHM.format(timestamp);
        else if(format.equals("yyyy-MM-dd"))
            time=sdfYMD.format(timestamp);        
        return time;
    }
}

  我們使用java自己的SimpleDateFormat來完成按指定格式的解析,這樣就不能將整個path或者name傳進去了,看看運行結果:

start time is:1412853670246
end time is:1412853672204.
Total time is:1958 ms.

  尼瑪!!!不是吧,不到2s。。。我這是在我的MBP上測試的,i5+8G+128G SSD,騷年你還猶豫什么呢?

  來開始改動源碼吧。。。

  我們最好把解析格式做成可配置的,並且最好還保留原來的可以加前綴名的方式,因為有可能需要加入主機名啊什么的,但是可以把這個前綴作為中綴,解析時間戳的結果作為前綴。。。

  1、我們需要兩個SimpleDateFormat來分別實現對path和name的格式化,並在配置時完成實例化,這樣可以創建一次對象就Ok,還需要path和name的格式化串,這個可以做成全局的或者局部的,我們這是全局的(其實沒有必要,是不是?哈哈),變量聲明階段代碼:

private SimpleDateFormat sdfPath = null;        //for file in hdfs path
    private SimpleDateFormat sdfName = null;        //for file name prefix

    private String filePathFormat;
    private String fileNameFormat;

  2、configure(Context context)方法中需要對上述對象進行配置了,很簡單,很明顯,相關代碼如下:

filePath = Preconditions.checkNotNull(
                context.getString("hdfs.path"), "hdfs.path is required");
        filePathFormat =  context.getString("hdfs.path.format", "yyyy/MM/dd");        //time's format ps:"yyyy-MM-dd"
        sdfPath = new SimpleDateFormat(filePathFormat);
        fileName = context.getString("hdfs.filePrefix", defaultFileName);
        fileNameFormat = context.getString("hdfs.filePrefix.format", "HHmm");
        sdfName = new SimpleDateFormat(fileNameFormat);

  增加的是上面的3、4、6、7四行代碼,解析格式串是在"hdfs.path.format"和"hdfs.filePrefix.format"中進行配置,其它的地方不要存在時間戳格式串了,也不要出現原來內置的那些%H、%mm等等格式了。上面兩個format配置有默認格式串,自己做決定就好。

  3、增加解析時間戳方法:

public String getTime(String type,long timestamp) {
        String time="";
        if(type.equals("name"))
            time=sdfName.format(timestamp);
        else if(type.equals("path"))
            time=sdfPath.format(timestamp);
        return time;
    }

  參數type用來指定是文件名的還是路徑名的,用來調用相應地格式化對象。

  4、下面是重點了,上面幾步即使配置了,不在這修改也不會起任何作用,修改process()方法,用以下代碼替換最上面提到的兩行代碼:

String realPath = filePath;
                String realName = fileName;
                if(realName.equals("%host") && event.getHeaders().get("host") != null)
                    realName = event.getHeaders().get("host").toString();
                if(event.getHeaders().get("timestamp") != null){
                    long time = Long.parseLong(event.getHeaders().get("timestamp"));
                    realPath += DIRECTORY_DELIMITER + getTime("path",time);
                    realName = getTime("name",time) + "." + realName;
                }

  這幾行的邏輯其實有:A、可以自定義中綴("hdfs.filePrefix",可以是常量或者是"%host",后者用來獲取主機名,前提是要設置hostinterceptor);B、默認中綴就是默認的"FlumeData";C、如果headers中存在時間戳,調用getTime方法解析時間戳。

  5、編譯&打包&替換&運行。。。

  哥打包比較原始,因為只修改了一個類,就把編譯后的class文件以HDFSEventSink開頭的幾個class文件替換了原來flume-hdfs-sink的jar包中的對應的class文件。。。尼瑪,原始吧。。。會maven,直接上maven吧。。。

  我這邊的測試結果是如果沒有配置壓縮功能,性能提升超過70%,如果配置上壓縮功能(gzip)性能提升超過50%,這數值僅供參考,不同環境不同主機不同人品可能不盡相同。。

  期待大伙的測試結果。。。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM