HDFS操作及小文件合並


 

 

 

小文件合並是針對文件上傳到HDFS之前

這些文件夾里面都是小文件

參考代碼

package com.gong.hadoop2; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.io.IOUtils; /** * function 合並小文件至 HDFS * @author 小講 * */
public class MergeSmallFilesToHDFS { private static FileSystem fs = null; private static FileSystem local = null; /** * @function main * @param args * @throws IOException * @throws URISyntaxException */
    public static void main(String[] args) throws IOException, URISyntaxException { list(); } /** * * @throws IOException * @throws URISyntaxException */
    public static void list() throws IOException, URISyntaxException { // 讀取hadoop文件系統的配置
        Configuration conf = new Configuration(); //文件系統訪問接口
        URI uri = new URI("hdfs://dajiangtai:9000"); //創建FileSystem對象
        fs = FileSystem.get(uri, conf); // 獲得本地文件系統
        local = FileSystem.getLocal(conf); //過濾目錄下的 svn 文件,globStatus從第一個參數通配符合到文件,剔除滿足第二個參數到結果,因為PathFilter中accept是return! 
        FileStatus[] dirstatus = local.globStatus(new Path("D://data/73/*"),new RegexExcludePathFilter("^.*svn$")); //獲取73目錄下的所有文件路徑,注意FIleUtil中stat2Paths()的使用,它將一個FileStatus對象數組轉換為Path對象數組。
        Path[] dirs = FileUtil.stat2Paths(dirstatus); FSDataOutputStream out = null; FSDataInputStream in = null; for (Path dir : dirs) { String fileName = dir.getName().replace("-", "");//文件名稱 //只接受日期目錄下的.txt文件,^匹配輸入字符串的開始位置,$匹配輸入字符串的結束位置,*匹配0個或多個字符。
            FileStatus[] localStatus = local.globStatus(new Path(dir+"/*"),new RegexAcceptPathFilter("^.*txt$")); // 獲得日期目錄下的所有文件
            Path[] listedPaths = FileUtil.stat2Paths(localStatus); //輸出路徑
            Path block = new Path("hdfs://dajiangtai:9000/middle/tv/"+ fileName + ".txt"); // 打開輸出流
            out = fs.create(block); for (Path p : listedPaths) { in = local.open(p);// 打開輸入流
                IOUtils.copyBytes(in, out, 4096, false); // 復制數據,IOUtils.copyBytes可以方便地將數據寫入到文件,不需要自己去控制緩沖區,也不用自己去循環讀取輸入源。false表示不自動關閉數據流,那么就手動關閉。 // 關閉輸入流
                in.close(); } if (out != null) { // 關閉輸出流
                out.close(); } } } /** * * @function 過濾 regex 格式的文件 * */
    public static class RegexExcludePathFilter implements PathFilter { private final String regex; public RegexExcludePathFilter(String regex) { this.regex = regex; } @Override public boolean accept(Path path) { // TODO Auto-generated method stub
            boolean flag = path.toString().matches(regex); return !flag; } } /** * * @function 接受 regex 格式的文件 * */
    public static class RegexAcceptPathFilter implements PathFilter { private final String regex; public RegexAcceptPathFilter(String regex) { this.regex = regex; } @Override public boolean accept(Path path) { // TODO Auto-generated method stub
            boolean flag = path.toString().matches(regex); return flag; } } }

 

最后一點,分清楚hadoop fs 和dfs的區別

 

hadoop fs <args>

FS涉及可以指向任何文件系統(如本地,HDFS等)的通用文件系統。因此,當您處理不同的文件系統(如本地FS,HFTP FS,S3 FS等)時,可以使用它

 

hadoop dfs <args>

dfs非常具體到HDFS。 將工作與HDFS有關。 這已被棄用,我們應該使用hdfs dfs。


hdfs dfs <args>

與第二個相同,即適用於與HDFS相關的所有操作,並且是推薦的命令,而不是hadoop dfs

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM