1、背景
在實際項目中,輸入數據往往是由許多小文件組成,這里的小文件是指小於HDFS系統Block大小的文件(默認128M), 然而每一個存儲在HDFS中的文件、目錄和塊都映射為一個對象,存儲在NameNode服務器內存中,通常占用150個字節。 如果有1千萬個文件,就需要消耗大約3G的內存空間。如果是10億個文件呢,簡直不可想象。所以在項目開始前, 我們選擇一種適合的方案來解決本項目的小文件問題
2、介紹
本地 D:\data目錄下有 2012-09-17 至 2012-09-23 一共7天的數據集,我們需要將這7天的數據集按日期合並為7個大文件上傳至 HDFS
3、數據
本地 D:\data目錄下的所有數據,如下圖所示
4、分析
基於項目的需求,我們通過下面幾個步驟完成
1、獲取 D:\data目錄下的所有日期路徑,循環所有日期路徑,通過globStatus()方法獲取所有txt格式文件路徑。
2、最后通過IOUtils.copyBytes(in, out, 4096, false)方法將數據集合並為大文件,並上傳至 HDFS
5、實現
自定義RegexAcceptPathFilter類實現 PathFilter,比如只接受D:\data\2012-09-17日期目錄下txt格式的文件
1 /** 2 * @ProjectName FileMerge 3 * @PackageName com.buaa 4 * @ClassName RegexAcceptPathFilter 5 * @Description 接受 regex 格式的文件 6 * @Author 劉吉超 7 * @Date 2016-04-18 21:58:07 8 */ 9 public static class RegexAcceptPathFilter implements PathFilter { 10 private final String regex; 11 12 public RegexAcceptPathFilter(String regex) { 13 this.regex = regex; 14 } 15 16 @Override 17 public boolean accept(Path path) { 18 boolean flag = path.toString().matches(regex); 19 return flag; 20 } 21 }
實現主程序 merge 方法,完成數據集的合並,並上傳至 HDFS
1 /** 2 * 合並 3 * 4 * @param srcPath 源目錄 5 * @param destPath 目標目錄 6 */ 7 public static void merge(String srcPath,String destPath) { 8 try{ 9 // 讀取hadoop文件系統的配置 10 Configuration conf = new Configuration(); 11 12 // 獲取遠端文件系統 13 URI uri = new URI(HDFSUri); 14 FileSystem remote = FileSystem.get(uri, conf); 15 16 // 獲得本地文件系統 17 FileSystem local = FileSystem.getLocal(conf); 18 19 // 獲取data目錄下的所有文件路徑 20 Path[] dirs = FileUtil.stat2Paths(local.globStatus(new Path(srcPath))); 21 22 FSDataOutputStream out = null; 23 FSDataInputStream in = null; 24 25 for (Path dir : dirs) { 26 // 文件名稱 27 String fileName = dir.getName().replace("-", ""); 28 // 只接受目錄下的.txt文件 29 FileStatus[] localStatus = local.globStatus(new Path(dir + "/*"), new RegexAcceptPathFilter("^.*.txt$")); 30 // 獲得目錄下的所有文件 31 Path[] listedPaths = FileUtil.stat2Paths(localStatus); 32 // 輸出路徑 33 Path block = new Path(destPath + "/" + fileName + ".txt"); 34 // 打開輸出流 35 out = remote.create(block); 36 for (Path p : listedPaths) { 37 // 打開輸入流 38 in = local.open(p); 39 // 復制數據 40 IOUtils.copyBytes(in, out, 4096, false); 41 // 關閉輸入流 42 in.close(); 43 } 44 if (out != null) { 45 // 關閉輸出流 46 out.close(); 47 } 48 } 49 }catch(Exception e){ 50 logger.error("", e); 51 } 52 }
6、一些運行代碼
1 /** 2 * main方法 3 * 4 * @param args 5 */ 6 public static void main(String[] args) { 7 merge("D:\\data\\*","/buaa/tv"); 8 }
7、結果

