轉自:http://www.cnblogs.com/lxf20061900/p/4014281.html
Flume-NG中的hdfs sink的路徑名(對應參數"hdfs.path",不允許為空)以及文件前綴(對應參數"hdfs.filePrefix")支持正則解析時間戳自動按時間創建目錄及文件前綴。
在實際使用中發現Flume內置的基於正則的解析方式非常耗時,有非常大的提升空間。如果你不需要配置按時間戳解析時間,那這篇文章對你用處不大,hdfs sink對應的解析時間戳的代碼位於org.apache.flume.sink.hdfs.HDFSEventSink的process()方法中,涉及兩句代碼:
1 // reconstruct the path name by substituting place holders 2 String realPath = BucketPath.escapeString(filePath, event.getHeaders(), 3 timeZone, needRounding, roundUnit, roundValue, useLocalTime); 4 String realName = BucketPath.escapeString(fileName, event.getHeaders(), 5 timeZone, needRounding, roundUnit, roundValue, useLocalTime);
其中,realPath是正則解析時間戳之后的完整路徑名,filePath參數就是配置文件中的"hdfs.path";realName就是正則解析時間戳之后的文件名前綴,fileName參數就是配置文件中的"hdfs.filePrefix"。其他參數都相同,event.getHeaders()是一個Map里面有時間戳(可以通過interceptor、自定義、使用hdfs sink的useLocalTimeStamp參數三種方式來設置),其他參數是時區、是否四舍五入以及時間單位等。
BucketPath.escapeString這個方法就是正則解析時間戳所在,具體代碼我們不再分析,現在我們編寫一個程序測試一下BucketPath.escapeString這個方法的性能,運行這個測試類要么在源碼中:
public class Test {public static void main(String[] args) { HashMap<String, String> headers = new HashMap<String, String>(); headers.put("timestamp", Long.toString(System.currentTimeMillis())); String filePath = "hdfs://xxxx.com:8020/data/flume/%Y-%m-%d"; String fileName = "%H-%M"; long start = System.currentTimeMillis(); System.out.println("start time is:" + start); for (int i = 0; i < 2400000; i++) { String realPath = BucketPath.escapeString(filePath, headers, null, false, Calendar.SECOND, 1, false); String realName = BucketPath.escapeString(fileName, headers, null, false, Calendar.SECOND, 1, false); } long end = System.currentTimeMillis(); System.out.println("end time is:"+ end + ".\nTotal time is:" + (end - start) + " ms."); } }
這個方法后面5個參數我們一般不需要用到,因此這里其實都設置成在實際中沒有影響的數值了。headers參數要有“timestamp”參數,我們這里循環處理240W個event,看看運行結果:
start time is:1412853253889 end time is:1412853278210. Total time is:24321 ms.
我靠,居然花了24s還多,尼瑪要知道哥目前白天的數據量也就是每秒4W個event,這還不是峰值呢。。。加上解析時間戳全量就扛不住了,怎么辦??
能怎么辦啊?只能想辦法替換這個解析辦法了,於是,我就想到這樣了,看測試程序:
public class Test { private static SimpleDateFormat sdfYMD = null; private static SimpleDateFormat sdfHM = null; public static void main(String[] args) { sdfYMD = new SimpleDateFormat("yyyy-MM-dd"); sdfHM = new SimpleDateFormat("HH-mm"); HashMap<String, String> headers = new HashMap<String, String>(); headers.put("timestamp", Long.toString(System.currentTimeMillis())); String filePath = "hdfs://dm056.tj.momo.com:8020/data/flume/%Y-%m-%d"; String fileName = "%H-%M"; long start = System.currentTimeMillis(); System.out.println("start time is:" + start); for (int i = 0; i < 2400000; i++) { //String realPath = BucketPath.escapeString(filePath, headers, null, false, Calendar.SECOND, 1, false); //String realName = BucketPath.escapeString(fileName, headers, null, false, Calendar.SECOND, 1, false); String realPath = getTime("yyyy-MM-dd",Long.parseLong(headers.get("timestamp"))); String realName = getTime("HH-mm",Long.parseLong(headers.get("timestamp"))); } long end = System.currentTimeMillis(); System.out.println("end time is:"+ end + ".\nTotal time is:" + (end - start) + " ms."); } public static String getTime(String format,long timestamp) { String time=""; if(format.equals("HH-mm")) time=sdfHM.format(timestamp); else if(format.equals("yyyy-MM-dd")) time=sdfYMD.format(timestamp); return time; } }
我們使用java自己的SimpleDateFormat來完成按指定格式的解析,這樣就不能將整個path或者name傳進去了,看看運行結果:
start time is:1412853670246 end time is:1412853672204. Total time is:1958 ms.
尼瑪!!!不是吧,不到2s。。。我這是在我的MBP上測試的,i5+8G+128G SSD,騷年你還猶豫什么呢?
來開始改動源碼吧。。。
我們最好把解析格式做成可配置的,並且最好還保留原來的可以加前綴名的方式,因為有可能需要加入主機名啊什么的,但是可以把這個前綴作為中綴,解析時間戳的結果作為前綴。。。
1、我們需要兩個SimpleDateFormat來分別實現對path和name的格式化,並在配置時完成實例化,這樣可以創建一次對象就Ok,還需要path和name的格式化串,這個可以做成全局的或者局部的,我們這是全局的(其實沒有必要,是不是?哈哈),變量聲明階段代碼:
private SimpleDateFormat sdfPath = null; //for file in hdfs path private SimpleDateFormat sdfName = null; //for file name prefix private String filePathFormat; private String fileNameFormat;
2、configure(Context context)方法中需要對上述對象進行配置了,很簡單,很明顯,相關代碼如下:
filePath = Preconditions.checkNotNull( context.getString("hdfs.path"), "hdfs.path is required"); filePathFormat = context.getString("hdfs.path.format", "yyyy/MM/dd"); //time's format ps:"yyyy-MM-dd" sdfPath = new SimpleDateFormat(filePathFormat); fileName = context.getString("hdfs.filePrefix", defaultFileName); fileNameFormat = context.getString("hdfs.filePrefix.format", "HHmm"); sdfName = new SimpleDateFormat(fileNameFormat);
增加的是上面的3、4、6、7四行代碼,解析格式串是在"hdfs.path.format"和"hdfs.filePrefix.format"中進行配置,其它的地方不要存在時間戳格式串了,也不要出現原來內置的那些%H、%mm等等格式了。上面兩個format配置有默認格式串,自己做決定就好。
3、增加解析時間戳方法:
public String getTime(String type,long timestamp) { String time=""; if(type.equals("name")) time=sdfName.format(timestamp); else if(type.equals("path")) time=sdfPath.format(timestamp); return time; }
參數type用來指定是文件名的還是路徑名的,用來調用相應地格式化對象。
4、下面是重點了,上面幾步即使配置了,不在這修改也不會起任何作用,修改process()方法,用以下代碼替換最上面提到的兩行代碼:
String realPath = filePath; String realName = fileName; if(realName.equals("%host") && event.getHeaders().get("host") != null) realName = event.getHeaders().get("host").toString(); if(event.getHeaders().get("timestamp") != null){ long time = Long.parseLong(event.getHeaders().get("timestamp")); realPath += DIRECTORY_DELIMITER + getTime("path",time); realName = getTime("name",time) + "." + realName; }
這幾行的邏輯其實有:A、可以自定義中綴("hdfs.filePrefix",可以是常量或者是"%host",后者用來獲取主機名,前提是要設置hostinterceptor);B、默認中綴就是默認的"FlumeData";C、如果headers中存在時間戳,調用getTime方法解析時間戳。
5、編譯&打包&替換&運行。。。
哥打包比較原始,因為只修改了一個類,就把編譯后的class文件以HDFSEventSink開頭的幾個class文件替換了原來flume-hdfs-sink的jar包中的對應的class文件。。。尼瑪,原始吧。。。會maven,直接上maven吧。。。
我這邊的測試結果是如果沒有配置壓縮功能,性能提升超過70%,如果配置上壓縮功能(gzip)性能提升超過50%,這數值僅供參考,不同環境不同主機不同人品可能不盡相同。。
期待大伙的測試結果。。。