hadoop小文件合並


1、背景

  在實際項目中,輸入數據往往是由許多小文件組成,這里的小文件是指小於HDFS系統Block大小的文件(默認128M), 然而每一個存儲在HDFS中的文件、目錄和塊都映射為一個對象,存儲在NameNode服務器內存中,通常占用150個字節。 如果有1千萬個文件,就需要消耗大約3G的內存空間。如果是10億個文件呢,簡直不可想象。所以在項目開始前, 我們選擇一種適合的方案來解決本項目的小文件問題

2、介紹

  本地 D:\data目錄下有 2012-09-17 至 2012-09-23 一共7天的數據集,我們需要將這7天的數據集按日期合並為7個大文件上傳至 HDFS

3、數據

  本地 D:\data目錄下的所有數據,如下圖所示

  

4、分析

  基於項目的需求,我們通過下面幾個步驟完成

  1、獲取 D:\data目錄下的所有日期路徑,循環所有日期路徑,通過globStatus()方法獲取所有txt格式文件路徑。

  2、最后通過IOUtils.copyBytes(in, out, 4096, false)方法將數據集合並為大文件,並上傳至 HDFS

5、實現

  自定義RegexAcceptPathFilter類實現 PathFilter,比如只接受D:\data\2012-09-17日期目錄下txt格式的文件

 1 /** 
 2 * @ProjectName FileMerge
 3 * @PackageName com.buaa
 4 * @ClassName RegexAcceptPathFilter
 5 * @Description 接受 regex 格式的文件
 6 * @Author 劉吉超
 7 * @Date 2016-04-18 21:58:07
 8 */
 9 public static class RegexAcceptPathFilter implements PathFilter {
10     private final String regex;
11 
12     public RegexAcceptPathFilter(String regex) {
13         this.regex = regex;
14     }
15 
16     @Override
17     public boolean accept(Path path) {
18         boolean flag = path.toString().matches(regex);
19         return flag;
20     }
21 }

  實現主程序 merge 方法,完成數據集的合並,並上傳至 HDFS

 1 /**
 2  * 合並
 3  * 
 4  * @param srcPath 源目錄
 5  * @param destPath 目標目錄
 6  */
 7 public static void merge(String srcPath,String destPath) {
 8     try{
 9         // 讀取hadoop文件系統的配置
10         Configuration conf = new Configuration();
11         
12         // 獲取遠端文件系統
13         URI uri = new URI(HDFSUri);
14         FileSystem remote = FileSystem.get(uri, conf);
15             
16         // 獲得本地文件系統
17         FileSystem local = FileSystem.getLocal(conf);
18             
19         // 獲取data目錄下的所有文件路徑
20         Path[] dirs = FileUtil.stat2Paths(local.globStatus(new Path(srcPath)));
21             
22         FSDataOutputStream out = null;
23         FSDataInputStream in = null;
24         
25         for (Path dir : dirs) {
26             // 文件名稱
27             String fileName = dir.getName().replace("-", "");
28             // 只接受目錄下的.txt文件
29             FileStatus[] localStatus = local.globStatus(new Path(dir + "/*"), new RegexAcceptPathFilter("^.*.txt$"));
30             // 獲得目錄下的所有文件
31             Path[] listedPaths = FileUtil.stat2Paths(localStatus);
32             // 輸出路徑
33             Path block = new Path(destPath + "/" + fileName + ".txt");
34             // 打開輸出流
35             out = remote.create(block);            
36             for (Path p : listedPaths) {
37                 // 打開輸入流
38                 in = local.open(p);
39                 // 復制數據
40                 IOUtils.copyBytes(in, out, 4096, false);
41                 // 關閉輸入流
42                 in.close();
43             }
44             if (out != null) {
45                 // 關閉輸出流
46                 out.close();
47             }
48         }
49     }catch(Exception e){
50         logger.error("", e);
51     }
52 }

6、一些運行代碼

1 /**
2  * main方法
3  * 
4  * @param args
5  */
6 public static void main(String[] args) {
7     merge("D:\\data\\*","/buaa/tv");
8 }

7、結果

    

如果,您認為閱讀這篇博客讓您有些收獲,不妨點擊一下右下角的【推薦】。
如果,您希望更容易地發現我的新博客,不妨點擊一下左下角的【關注我】。
如果,您對我的博客所講述的內容有興趣,請繼續關注我的后續博客,我是【劉超★ljc】。

本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。

地址:下載


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM