不多說,直接上代碼。
Hadoop 自身提供了幾種機制來解決相關的問題,包括HAR,SequeueFile和CombineFileInputFormat。
Hadoop 自身提供的幾種小文件合並機制
Hadoop HAR
將眾多小文件打包成一個大文件進行存儲,並且打包后原來的文件仍然可以通過Map-reduce進行操作,打包后的文件由索引和存儲兩大部分組成
缺點:一旦創建就不能修改,也不支持追加操作,還不支持文檔壓縮,當有新文件進來以后,需要重新打包。
SequeuesFile
Sequence file由一系列的二進制key/value組成,如果key為小文件名,value為文件內容,則可以將大批小文件合並成一個大文件。
優缺點:對小文件的存取都比較自由,也不限制用戶和文件的多少,但是該方法不能使用append方法,所以適合一次性寫入大量小文件的場景。
CombineFileInputFormat
CombineFileInputFormat是一種新的inputformat,用於將多個文件合並成一個單獨的split作為輸入,而不是通常使用一個文件作為輸入。另外,它會考慮數據的存儲位置。


目前很多公司采用的方法就是在數據進入 Hadoop 的 HDFS 系統之前進行合並(也是本博文這方法),一般效果較上述三種方法明顯。





代碼版本1
MergeSmallFilesToHDFS .java
package zhouls.bigdata.myMapReduce.MergeSmallFiles; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.IOUtils; /** * function 合並小文件至 HDFS * * */ public class MergeSmallFilesToHDFS { private static FileSystem fs = null; private static FileSystem local = null; /** * @function main * @param args * @throws IOException * @throws URISyntaxException */ public static void main(String[] args) throws IOException, URISyntaxException { list(); } /** * * @throws IOException * @throws URISyntaxException */ public static void list() throws IOException, URISyntaxException { // 讀取hadoop文件系統的配置 Configuration conf = new Configuration(); //文件系統訪問接口 URI uri = new URI("hdfs://HadoopMaster:9000"); //創建FileSystem對象aa fs = FileSystem.get(uri, conf); // 獲得本地文件系統 local = FileSystem.getLocal(conf); //過濾目錄下的 svn 文件 FileStatus[] dirstatus = local.globStatus(new Path("./data/mergeSmallFiles/*"),new RegexExcludePathFilter("^.*svn$")); //獲取73目錄下的所有文件路徑 Path[] dirs = FileUtil.stat2Paths(dirstatus); FSDataOutputStream out = null; FSDataInputStream in = null; for (Path dir : dirs) { String fileName = dir.getName().replace("-", "");//文件名稱 //只接受日期目錄下的.txt文件a FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$")); // 獲得日期目錄下的所有文件 Path[] listedPaths = FileUtil.stat2Paths(localStatus); //輸出路徑 Path block = new Path("hdfs://HadoopMaster:9000/tv/"+ fileName + ".txt"); // 打開輸出流 out = fs.create(block); for (Path p : listedPaths) { in = local.open(p);// 打開輸入流 IOUtils.copyBytes(in, out, 4096, false); // 復制數據 // 關閉輸入流 in.close(); } if (out != null) { // 關閉輸出流a out.close(); } } } /** * * @function 過濾 regex 格式的文件 * */ public static class RegexExcludePathFilter implements PathFilter { private final String regex; public RegexExcludePathFilter(String regex) { this.regex = regex; } @Override public boolean accept(Path path) { // TODO Auto-generated method stub boolean flag = path.toString().matches(regex); return !flag; } } /** * * @function 接受 regex 格式的文件 * */ public static class RegexAcceptPathFilter implements PathFilter { private final String regex; public RegexAcceptPathFilter(String regex) { this.regex = regex; } @Override public boolean accept(Path path) { // TODO Auto-generated method stub boolean flag = path.toString().matches(regex); return flag; } } }
代碼版本2
MergeSmallFilesToHDFS .java
package zhouls.bigdata.myMapReduce.MergeSmallFiles; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.IOUtils; /** * function 合並小文件至 HDFS , 文件與塊大小(比如128M)來比,小的話,稱為小文件。是一個相對概念!相對於數據塊而言的! * @author 小講 * */ public class MergeSmallFilesToHDFS { private static FileSystem fs = null; private static FileSystem local = null; /** * @function main * @param args * @throws IOException * @throws URISyntaxException */ public static void main(String[] args) throws IOException, URISyntaxException { list(); } /** * * @throws IOException * @throws URISyntaxException */ public static void list() throws IOException, URISyntaxException { // 讀取hadoop文件系統的配置 Configuration conf = new Configuration(); //文件系統訪問接口 URI uri = new URI("hdfs://master:9000"); // URL、URI與Path三者的區別 // Hadoop文件系統中通過Hadoop Path對象來代表一個文件 // URL(相當於絕對路徑) -> (文件) -> URI(相當於相對路徑,即代表URL前面的那一部分) // URI:如hdfs://master:9000 // 如,URL.openStream //獲得FileSystem實例,即HDFS fs = FileSystem.get(uri, conf); //獲得FileSystem實例,即Local local = FileSystem.getLocal(conf); // 為什么要獲取到Local呢,因為,我們要把本地D盤下data/73目錄下的文件要合並后,上傳到HDFS里,所以,我們需先獲取到Local,再來做合並工作啦! //過濾目錄下的 svn 文件,globStatus從第一個參數通配符合到文件,剔除滿足第二個參數到結果,因為PathFilter中accept是return! FileStatus[] dirstatus = local.globStatus(new Path("D://data/73/*"),new RegexExcludePathFilter("^.*svn$"));//一般這是隱藏文件,所以得排除 // ^表示匹配我們字符串開始的位置 *代表0到多個字符 $代表字符串結束的位置 // RegexExcludePathFilter來只排除我們不需要的,即svn格式 // RegexExcludePathFilter這個方法我們自己寫 // 但是我們,最終是要處理文件里的東西,最終是要轉成Path類型,因為Path對象f,它對應着一個文件。 //獲取73目錄下的所有文件路徑,注意FIleUtil中stat2Paths()的使用,它將一個FileStatus對象數組轉換為Path對象數組。 Path[] dirs = FileUtil.stat2Paths(dirstatus);//dirstatus是FileStatus數組類型 FSDataOutputStream out = null;//輸出流 FSDataInputStream in = null;//輸入流 // 很多人搞不清輸入流和輸出流,!!!! // 其實啊,輸入流、輸出流都是針對內存的 // 往內存里寫,是輸入流。 // 內存往文件里寫,是輸出Luis。 // // 比如一個文件A復制到另一文件B,那么,先寫到內存里,再寫到文件B。 // => 則文件A寫到內存里,叫輸入流。 // => 則內存里寫到文件B,叫輸出流 for (Path dir : dirs) {//for星型循環,即將dirs是Path對象數組,一一傳給Path dir String fileName = dir.getName().replace("-", "");//文件名稱 // 即獲取到如2012-09-17,然后經過replace("-", ""),得到20120917 //只接受日期目錄下的.txt文件,^匹配輸入字符串的開始位置,$匹配輸入字符串的結束位置,*匹配0個或多個字符。 FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$")); // FileStatus[] localStatus = local.listStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$"));//試試,看有什么區別?出現錯誤的!為什么? //RegexAcceptPathFilter這個方法,我們自己寫 // RegexAcceptPathFilter來只接收我們需要,即txt格式 // 這里,我們還可以只接收別的格式,自己去改,一定要鍛煉學會改別人的代碼 // 獲得如2012-09-17日期目錄下的所有文件 Path[] listedPaths = FileUtil.stat2Paths(localStatus); // 同樣,但是我們,最終是要處理文件里的東西,最終是要轉成Path類型,因為Path對象f,它對應着一個文件。 //輸出路徑 Path block = new Path("hdfs://master:9000/outData/MergeSmallFilesToHDFS/"+ fileName + ".txt"); // 打開輸出流 out = fs.create(block);//因為,合並小文件之后,比如這是,合並2012-09-17日期目錄下的所有小文件,之后,要上傳到HDFS里。 // 類似於,文件A寫到內存里,再內存里寫到文件B。而這行代碼out = fs.create(block);是相當於是,內存里寫到文件B。所以是輸出流,即是從內存里輸出的,所以叫輸出流。 // 這里,文件A是Local 文件B是HDFS // 文件與塊大小(比如128M)來比,小的話,稱為小文件。是一個相對概念!相對於數據塊而言的! // 很多人搞不清輸入流和輸出流,!!!! // 其實啊,輸入流、輸出流都是針對內存的 // 往內存里寫,是輸入流。 // 內存往文件里寫,是輸出Luis。 // // 比如一個文件A復制到另一文件B,那么,先寫到內存里,再寫到文件B。 // => 則文件A寫到內存里,叫輸入流。 // => 則內存里寫到文件B,叫輸出流 for (Path p : listedPaths) {//for星型循環,即將listedPaths的值一一傳給Path p in = local.open(p);// 打開輸入流in // 類似於,文件A寫到內存里,再內存里寫到文件B。而這行代碼in = local.open(p);是相當於是,文件A寫到內存里。所以是輸如流,即是寫到內存里的,所以叫輸入流。 // 這里,文件A是Local 文件B是HDFS IOUtils.copyBytes(in, out, 4096, false); // 復制數據,IOUtils.copyBytes可以方便地將數據寫入到文件,不需要自己去控制緩沖區,也不用自己去循環讀取輸入源。false表示不自動關閉數據流,那么就手動關閉。 // IOUtils.copyBytes這個方法很重要 //是否自動關閉輸入流和輸出流,若是false,就要單獨去關閉。則不在這個方法體里關閉輸入和輸出流了。 // 若是true,則在這個方法里關閉輸入和輸出流。不需單獨去關閉了 // 明白,IOUtils類的copyBytes將hdfs數據流拷貝到標准輸出流System.out中, // copyBytes前兩個參數好理解,一個輸入,一個輸出,第三個是緩存大小,第四個指定拷貝完畢后是否關閉流。 // 要設置為false,標准輸出流不關閉,我們要手動關閉輸入流。即,設置為false表示關閉輸入流 // 主要是把最后的這個參數定義好, 就可以了。 定義為true還是false,則決定着是否在這個方法體里關閉 // 若定義為true,則在這個方法體里直接關閉輸入流、輸出流。不需單獨去關閉了 // 若定義為false,則不在這個方法體里直接關閉輸入流、輸出流。需單獨去關閉了 // 關閉輸入流 in.close();//若定義為false,則不在這個方法體里直接關閉輸入流、輸出流。需單獨去關閉了。這就是單獨在關閉輸入流!!!懂了嗎 } if (out != null) {//這里為什么不為空,空指針,則說明里面還有資源。 // 關閉輸出流 out.close();//若定義為false,則不在這個方法體里直接關閉輸入流、輸出流。需單獨去關閉了。這就是單獨在關閉輸出流!!!懂了嗎 } } } /** * * @function 過濾 regex 格式的文件 * */ public static class RegexExcludePathFilter implements PathFilter { private final String regex;//變量 public RegexExcludePathFilter(String regex) {//這個是上面的那個,正在表達式 this.regex = regex;//將String regex的值,賦給RegexExcludePathFilter類里的private final String regex的值 } public boolean accept(Path path) {//主要是實現accept方法 // TODO Auto-generated method stub boolean flag = path.toString().matches(regex);//匹配正則表達式,這里是^.*svn$ return !flag;//如果要接收 regex 格式的文件,則accept()方法就return flag; 如果想要過濾掉regex格式的文件,則accept()方法就return !flag。 } } /** * * @function 接受 regex 格式的文件 * */ public static class RegexAcceptPathFilter implements PathFilter { private final String regex;//變量 public RegexAcceptPathFilter(String regex) {//這個是上面的那個,正在表達式 this.regex = regex;//將String regex的值,賦給RegexAcceptPathFilter類里的private final String regex的值 } public boolean accept(Path path) {//主要是實現accept方法 // TODO Auto-generated method stub boolean flag = path.toString().matches(regex);//匹配正則表達式,這里是^.*txt$ return flag;//如果要接收 regex 格式的文件,則accept()方法就return flag; 如果想要過濾掉regex格式的文件,則accept()方法就return !flag。 } } }
