在本地文件上傳至HDFS過程中,很多情況下一個目錄包含很多個文件,而我們需要對這些文件進行篩選,選出符合我們要求的文件,上傳至HDFS。這時就需要我們用到文件模式。 在項目開始前,我們先掌握文件模式
1、文件模式
在某個單一操作中處理一系列文件是很常見的。例如一個日志處理的MapReduce作業可能要分析一個月的日志量。如果一個文件一個文件或者一個目錄一個目錄的聲明那就太麻煩了,我們可以使用通配符(wild card)來匹配多個文件(這個操作也叫做globbing)。
Hadoop提供了兩種方法來處理文件組:
1 public FileStatus[] globStatus(Path pathPattern) throws IOException; 2 3 public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException;
- PathFilter
使用文件模式有時候並不能有效的描述你想要的一系列文件,例如如果你想排除某個特定文件就很難。所以FileSystem的listStatus()和globStatus()方法就提供了一個可選參數:PathFilter——它允許你一些更細化的控制匹配:
1 package org.apache.hadoop.fs; 2 3 public interface PathFilter 4 { 5 boolean accept(Path path); 6 }
- Hadoop中的匹配符與Unix中bash相同,如下圖所示:、
2、數據
我們利用通配符和PathFilter 對象,將本地多種格式的文件上傳至 HDFS,並過濾掉txt文本格式以外的文件
數據我隨便造了些,如下
3、分析
基於需求,我們通過以下兩步完成:
1、首先使用globStatus(Path pathPattern, PathFilter filter),完成文件格式過濾,獲取所有 txt 格式的文件。
2、然后使用 Java API 接口 copyFromLocalFile,將所有 txt 格式的文件上傳至 HDFS
4、實現
首先定義一個類 RegexAcceptPathFilter實現 PathFilter,過濾掉 txt 文本格式以外的文件。
1 /** 2 * @ProjectName PathFilter 3 * @PackageName com.buaa 4 * @ClassName RegexAcceptPathFilter 5 * @Description 只接受符合regex的文件 6 * @Author 劉吉超 7 * @Date 2016-04-15 20:39:21 8 */ 9 public static class RegexAcceptPathFilter implements PathFilter { 10 private final String regex; 11 12 public RegexAcceptPathFilter(String regex) { 13 this.regex = regex; 14 } 15 16 @Override 17 public boolean accept(Path path) { 18 boolean flag = path.toString().matches(regex); 19 // 只接受符合regex的文件 20 return flag; 21 } 22 }
如果要接收 regex格式的文件,則accept()方法就return flag; 如果想要過濾掉regex格式的文件,則accept()方法就return !flag。
接下來在 uploadFile方法中,使用globStatus方法獲取所有txt文件,然后通過copyFromLocalFile方法將文件上傳至HDFS。
1 /** 2 * 過濾文件格式 將多個文件上傳至 HDFS 3 * 4 * @param srcPath 源路徑 5 * @param destPath 目標路徑 6 * @param filter 正則 7 * @throws URISyntaxException 8 * @throws IOException 9 */ 10 public static void uploadFile(String srcPath,String destPath,String filter) throws URISyntaxException, IOException { 11 // 讀取配置文件 12 Configuration conf = new Configuration(); 13 // 遠端文件系統 14 URI uri = new URI(HDFSUri.trim()); 15 FileSystem remote = FileSystem.get(uri,conf);; 16 // 獲得本地文件系統 17 FileSystem local = FileSystem.getLocal(conf); 18 19 // 只上傳srcPath目錄下符合filter條件的文件 20 FileStatus[] localStatus = local.globStatus(new Path(srcPath), new RegexAcceptPathFilter(filter)); 21 // 獲得所有文件路徑 22 Path[] listedPaths = FileUtil.stat2Paths(localStatus); 23 24 if(listedPaths != null){ 25 for(Path path : listedPaths){ 26 // 將本地文件上傳到HDFS 27 remote.copyFromLocalFile(path, new Path(HDFSUri + destPath)); 28 } 29 } 30 }
在 main() 方法在調用 uploadFile,執行多文件上傳至 HDFS
1 public static void main(String[] args) throws IOException,URISyntaxException { 2 // 第一個參數:代表是源路徑 3 // 第二個參數:代表是目錄路徑 4 // 第三個參數:代表是正則,這里我們只有上傳txt文件,所以正則是^.*txt$ 5 uploadFile("D:\\data\\*","/buaa/data","^.*txt$"); 6 }