Hadoop MapReduce編程 API入門系列之小文件合並(二十九)


 

 

 

  不多說,直接上代碼。

     Hadoop 自身提供了幾種機制來解決相關的問題,包括HAR,SequeueFile和CombineFileInputFormat。

 

 

 

Hadoop 自身提供的幾種小文件合並機制

Hadoop HAR

        將眾多小文件打包成一個大文件進行存儲,並且打包后原來的文件仍然可以通過Map-reduce進行操作,打包后的文件由索引和存儲兩大部分組成

        缺點:一旦創建就不能修改,也不支持追加操作,還不支持文檔壓縮,當有新文件進來以后,需要重新打包。
 
 
 

SequeuesFile

        Sequence file由一系列的二進制key/value組成,如果key為小文件名,value為文件內容,則可以將大批小文件合並成一個大文件。

        優缺點:對小文件的存取都比較自由,也不限制用戶和文件的多少,但是該方法不能使用append方法,所以適合一次性寫入大量小文件的場景。
 
 
 

CombineFileInputFormat

        CombineFileInputFormat是一種新的inputformat,用於將多個文件合並成一個單獨的split作為輸入,而不是通常使用一個文件作為輸入。另外,它會考慮數據的存儲位置。

 

     目前很多公司采用的方法就是在數據進入 Hadoop 的 HDFS 系統之前進行合並(也是本博文這方法),一般效果較上述三種方法明顯。

 

 

 

 

 

 

 

 

 

 

 

 

 代碼版本1

MergeSmallFilesToHDFS .java
package zhouls.bigdata.myMapReduce.MergeSmallFiles;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IOUtils;
/**
* function 合並小文件至 HDFS 
* 
*
*/
public class MergeSmallFilesToHDFS {
private static FileSystem fs = null;
private static FileSystem local = null;
/**
* @function main 
* @param args
* @throws IOException
* @throws URISyntaxException
*/
public static void main(String[] args) throws IOException,
URISyntaxException {
list();
}

/**
* 
* @throws IOException
* @throws URISyntaxException
*/
public static void list() throws IOException, URISyntaxException {
// 讀取hadoop文件系統的配置
Configuration conf = new Configuration();
//文件系統訪問接口
URI uri = new URI("hdfs://HadoopMaster:9000");
//創建FileSystem對象aa
fs = FileSystem.get(uri, conf);
// 獲得本地文件系統
local = FileSystem.getLocal(conf);
//過濾目錄下的 svn 文件
FileStatus[] dirstatus = local.globStatus(new Path("./data/mergeSmallFiles/*"),new RegexExcludePathFilter("^.*svn$"));
//獲取73目錄下的所有文件路徑
Path[] dirs = FileUtil.stat2Paths(dirstatus);
FSDataOutputStream out = null;
FSDataInputStream in = null;
for (Path dir : dirs) {
String fileName = dir.getName().replace("-", "");//文件名稱
//只接受日期目錄下的.txt文件a
FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
// 獲得日期目錄下的所有文件
Path[] listedPaths = FileUtil.stat2Paths(localStatus);
//輸出路徑
Path block = new Path("hdfs://HadoopMaster:9000/tv/"+ fileName + ".txt");
// 打開輸出流
out = fs.create(block);    
for (Path p : listedPaths) {
in = local.open(p);// 打開輸入流
IOUtils.copyBytes(in, out, 4096, false); // 復制數據
// 關閉輸入流
in.close();
}
if (out != null) {
// 關閉輸出流a
out.close();
}
}

}

/**
* 
* @function 過濾 regex 格式的文件
*
*/
public static class RegexExcludePathFilter implements PathFilter {
private final String regex;

public RegexExcludePathFilter(String regex) {
this.regex = regex;
}

@Override
public boolean accept(Path path) {
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return !flag;
}

}

/**
* 
* @function 接受 regex 格式的文件
*
*/
public static class RegexAcceptPathFilter implements PathFilter {
private final String regex;

public RegexAcceptPathFilter(String regex) {
this.regex = regex;
}

@Override
public boolean accept(Path path) {
// TODO Auto-generated method stub
boolean flag = path.toString().matches(regex);
return flag;
}

}
}

 

 

 

 

 

 

 

 

 

 代碼版本2

MergeSmallFilesToHDFS .java
package zhouls.bigdata.myMapReduce.MergeSmallFiles;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.io.IOUtils;
/**
 * function 合並小文件至 HDFS     ,  文件與塊大小(比如128M)來比,小的話,稱為小文件。是一個相對概念!相對於數據塊而言的!
 * @author 小講
 *
 */
public class MergeSmallFilesToHDFS {
    private static FileSystem fs = null;
    private static FileSystem local = null;
    /**
     * @function main 
     * @param args
     * @throws IOException
     * @throws URISyntaxException
     */
    public static void main(String[] args) throws IOException,
            URISyntaxException {
        list();
    }

    /**
     * 
     * @throws IOException
     * @throws URISyntaxException
     */
    public static void list() throws IOException, URISyntaxException {
        // 讀取hadoop文件系統的配置
        Configuration conf = new Configuration();
        //文件系統訪問接口
        URI uri = new URI("hdfs://master:9000");
        
//        URL、URI與Path三者的區別
//        Hadoop文件系統中通過Hadoop Path對象來代表一個文件    
//        URL(相當於絕對路徑)    ->   (文件) ->    URI(相當於相對路徑,即代表URL前面的那一部分)
//        URI:如hdfs://master:9000
//        如,URL.openStream
        
        
        
        //獲得FileSystem實例,即HDFS
        fs = FileSystem.get(uri, conf);
            
        
        //獲得FileSystem實例,即Local
        local = FileSystem.getLocal(conf);
//            為什么要獲取到Local呢,因為,我們要把本地D盤下data/73目錄下的文件要合並后,上傳到HDFS里,所以,我們需先獲取到Local,再來做合並工作啦!
        
        
        //過濾目錄下的 svn 文件,globStatus從第一個參數通配符合到文件,剔除滿足第二個參數到結果,因為PathFilter中accept是return!  
        FileStatus[] dirstatus = local.globStatus(new Path("D://data/73/*"),new RegexExcludePathFilter("^.*svn$"));//一般這是隱藏文件,所以得排除
//        ^表示匹配我們字符串開始的位置               *代表0到多個字符                        $代表字符串結束的位置
//        RegexExcludePathFilter來只排除我們不需要的,即svn格式
//        RegexExcludePathFilter這個方法我們自己寫
        
//        但是我們,最終是要處理文件里的東西,最終是要轉成Path類型,因為Path對象f,它對應着一個文件。
        
        //獲取73目錄下的所有文件路徑,注意FIleUtil中stat2Paths()的使用,它將一個FileStatus對象數組轉換為Path對象數組。
        Path[] dirs = FileUtil.stat2Paths(dirstatus);//dirstatus是FileStatus數組類型
        
        
        FSDataOutputStream out = null;//輸出流
        FSDataInputStream in = null;//輸入流
        
//        很多人搞不清輸入流和輸出流,!!!!
//        其實啊,輸入流、輸出流都是針對內存的
//        往內存里寫,是輸入流。
//        內存往文件里寫,是輸出Luis。
//        
//        比如一個文件A復制到另一文件B,那么,先寫到內存里,再寫到文件B。
//           =>   則文件A寫到內存里,叫輸入流。
//           =>    則內存里寫到文件B,叫輸出流    
        
        
        for (Path dir : dirs) {//for星型循環,即將dirs是Path對象數組,一一傳給Path dir
            
            String fileName = dir.getName().replace("-", "");//文件名稱
//                        即獲取到如2012-09-17,然后經過replace("-", ""),得到20120917
            
            
            //只接受日期目錄下的.txt文件,^匹配輸入字符串的開始位置,$匹配輸入字符串的結束位置,*匹配0個或多個字符。
            FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));
//            FileStatus[] localStatus = local.listStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));//試試,看有什么區別?出現錯誤的!為什么?

    
                    //RegexAcceptPathFilter這個方法,我們自己寫
//            RegexAcceptPathFilter來只接收我們需要,即txt格式
//            這里,我們還可以只接收別的格式,自己去改,一定要鍛煉學會改別人的代碼
            
            
            // 獲得如2012-09-17日期目錄下的所有文件
            Path[] listedPaths = FileUtil.stat2Paths(localStatus);
//            同樣,但是我們,最終是要處理文件里的東西,最終是要轉成Path類型,因為Path對象f,它對應着一個文件。
            
            
            
            //輸出路徑
            Path block = new Path("hdfs://master:9000/outData/MergeSmallFilesToHDFS/"+ fileName + ".txt");
            
            
            // 打開輸出流
            out = fs.create(block);//因為,合並小文件之后,比如這是,合並2012-09-17日期目錄下的所有小文件,之后,要上傳到HDFS里。
//                類似於,文件A寫到內存里,再內存里寫到文件B。而這行代碼out = fs.create(block);是相當於是,內存里寫到文件B。所以是輸出流,即是從內存里輸出的,所以叫輸出流。
//                            這里,文件A是Local                文件B是HDFS

//                                        文件與塊大小(比如128M)來比,小的話,稱為小文件。是一個相對概念!相對於數據塊而言的!
            
//            很多人搞不清輸入流和輸出流,!!!!
//            其實啊,輸入流、輸出流都是針對內存的
//            往內存里寫,是輸入流。
//            內存往文件里寫,是輸出Luis。
//            
//            比如一個文件A復制到另一文件B,那么,先寫到內存里,再寫到文件B。
//               =>   則文件A寫到內存里,叫輸入流。
//               =>    則內存里寫到文件B,叫輸出流    
            
            
            for (Path p : listedPaths) {//for星型循環,即將listedPaths的值一一傳給Path p
                in = local.open(p);// 打開輸入流in
//                類似於,文件A寫到內存里,再內存里寫到文件B。而這行代碼in = local.open(p);是相當於是,文件A寫到內存里。所以是輸如流,即是寫到內存里的,所以叫輸入流。
//                    這里,文件A是Local                文件B是HDFS
                
                IOUtils.copyBytes(in, out, 4096, false); // 復制數據,IOUtils.copyBytes可以方便地將數據寫入到文件,不需要自己去控制緩沖區,也不用自己去循環讀取輸入源。false表示不自動關閉數據流,那么就手動關閉。
//                IOUtils.copyBytes這個方法很重要
                                //是否自動關閉輸入流和輸出流,若是false,就要單獨去關閉。則不在這個方法體里關閉輸入和輸出流了。
//                                                     若是true,則在這個方法里關閉輸入和輸出流。不需單獨去關閉了
                
                
//                明白,IOUtils類的copyBytes將hdfs數據流拷貝到標准輸出流System.out中,
//                copyBytes前兩個參數好理解,一個輸入,一個輸出,第三個是緩存大小,第四個指定拷貝完畢后是否關閉流。
//                要設置為false,標准輸出流不關閉,我們要手動關閉輸入流。即,設置為false表示關閉輸入流
                
//                主要是把最后的這個參數定義好, 就可以了。 定義為true還是false,則決定着是否在這個方法體里關閉
//                若定義為true,則在這個方法體里直接關閉輸入流、輸出流。不需單獨去關閉了
//                若定義為false,則不在這個方法體里直接關閉輸入流、輸出流。需單獨去關閉了
                
                
                // 關閉輸入流
                in.close();//若定義為false,則不在這個方法體里直接關閉輸入流、輸出流。需單獨去關閉了。這就是單獨在關閉輸入流!!!懂了嗎
            }
            if (out != null) {//這里為什么不為空,空指針,則說明里面還有資源。
                // 關閉輸出流
                out.close();//若定義為false,則不在這個方法體里直接關閉輸入流、輸出流。需單獨去關閉了。這就是單獨在關閉輸出流!!!懂了嗎
            }
        }
        
    }

    /**
     * 
     * @function 過濾 regex 格式的文件
     *
     */
    public static class RegexExcludePathFilter implements PathFilter {
        private final String regex;//變量

        public RegexExcludePathFilter(String regex) {//這個是上面的那個,正在表達式
            this.regex = regex;//將String regex的值,賦給RegexExcludePathFilter類里的private final String regex的值
        }

        public boolean accept(Path path) {//主要是實現accept方法
            // TODO Auto-generated method stub
            boolean flag = path.toString().matches(regex);//匹配正則表達式,這里是^.*svn$
            return !flag;//如果要接收 regex 格式的文件,則accept()方法就return flag; 如果想要過濾掉regex格式的文件,則accept()方法就return !flag。
        }

    }

    /**
     * 
     * @function 接受 regex 格式的文件
     *
     */
    public static class RegexAcceptPathFilter implements PathFilter {
        private final String regex;//變量

        public RegexAcceptPathFilter(String regex) {//這個是上面的那個,正在表達式
            this.regex = regex;//將String regex的值,賦給RegexAcceptPathFilter類里的private final String regex的值
        }

        public boolean accept(Path path) {//主要是實現accept方法
            // TODO Auto-generated method stub
            boolean flag = path.toString().matches(regex);//匹配正則表達式,這里是^.*txt$
            return flag;//如果要接收 regex 格式的文件,則accept()方法就return flag; 如果想要過濾掉regex格式的文件,則accept()方法就return !flag。
        }

    }
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM