HDFS設計的主要目的是對海量數據進行存儲,也就是說在其上能夠存儲很大量文件(可以存儲TB級的文件)。HDFS將這些文件分割之后,存儲在不同的DataNode上, HDFS 提供了兩種訪問接口:Shell接口和Java API 接口,對HDFS里面的文件進行操作,具體每個Block放在哪台DataNode上面,對於開發者來說是透明的。
1、獲取文件系統
1 /**
2 * 獲取文件系統
3 *
4 * @return FileSystem
5 */
6 public static FileSystem getFileSystem() {
7 //讀取配置文件
8 Configuration conf = new Configuration();
9 // 文件系統
10 FileSystem fs = null;
11
12 String hdfsUri = HDFSUri;
13 if(StringUtils.isBlank(hdfsUri)){
14 // 返回默認文件系統 如果在 Hadoop集群下運行,使用此種方法可直接獲取默認文件系統
15 try {
16 fs = FileSystem.get(conf);
17 } catch (IOException e) {
18 logger.error("", e);
19 }
20 }else{
21 // 返回指定的文件系統,如果在本地測試,需要使用此種方法獲取文件系統
22 try {
23 URI uri = new URI(hdfsUri.trim());
24 fs = FileSystem.get(uri,conf);
25 } catch (URISyntaxException | IOException e) {
26 logger.error("", e);
27 }
28 }
29
30 return fs;
31 }
2、創建文件目錄
1 /**
2 * 創建文件目錄
3 *
4 * @param path
5 */
6 public static void mkdir(String path) {
7 try {
8 // 獲取文件系統
9 FileSystem fs = getFileSystem();
10
11 String hdfsUri = HDFSUri;
12 if(StringUtils.isNotBlank(hdfsUri)){
13 path = hdfsUri + path;
14 }
15
16 // 創建目錄
17 fs.mkdirs(new Path(path));
18
19 //釋放資源
20 fs.close();
21 } catch (IllegalArgumentException | IOException e) {
22 logger.error("", e);
23 }
24 }
3、刪除文件或者文件目錄
1 /**
2 * 刪除文件或者文件目錄
3 *
4 * @param path
5 */
6 public static void rmdir(String path) {
7 try {
8 // 返回FileSystem對象
9 FileSystem fs = getFileSystem();
10
11 String hdfsUri = HDFSUri;
12 if(StringUtils.isNotBlank(hdfsUri)){
13 path = hdfsUri + path;
14 }
15
16 // 刪除文件或者文件目錄 delete(Path f) 此方法已經棄用
17 fs.delete(new Path(path),true);
18
19 // 釋放資源
20 fs.close();
21 } catch (IllegalArgumentException | IOException e) {
22 logger.error("", e);
23 }
24 }
3、根據filter獲取目錄下的文件
1 /**
2 * 根據filter獲取目錄下的文件
3 *
4 * @param path
5 * @param pathFilter
6 * @return String[]
7 */
8 public static String[] ListFile(String path,PathFilter pathFilter) {
9 String[] files = new String[0];
10
11 try {
12 // 返回FileSystem對象
13 FileSystem fs = getFileSystem();
14
15 String hdfsUri = HDFSUri;
16 if(StringUtils.isNotBlank(hdfsUri)){
17 path = hdfsUri + path;
18 }
19
20 FileStatus[] status;
21 if(pathFilter != null){
22 // 根據filter列出目錄內容
23 status = fs.listStatus(new Path(path),pathFilter);
24 }else{
25 // 列出目錄內容
26 status = fs.listStatus(new Path(path));
27 }
28
29 // 獲取目錄下的所有文件路徑
30 Path[] listedPaths = FileUtil.stat2Paths(status);
31 // 轉換String[]
32 if (listedPaths != null && listedPaths.length > 0){
33 files = new String[listedPaths.length];
34 for (int i = 0; i < files.length; i++){
35 files[i] = listedPaths[i].toString();
36 }
37 }
38 // 釋放資源
39 fs.close();
40 } catch (IllegalArgumentException | IOException e) {
41 logger.error("", e);
42 }
43
44 return files;
45 }
4、文件上傳至 HDFS
1 /**
2 * 文件上傳至 HDFS
3 *
4 * @param delSrc
5 * @param overwrite
6 * @param srcFile
7 * @param destPath
8 */
9 public static void copyFileToHDFS(boolean delSrc, boolean overwrite,String srcFile,String destPath) {
10 // 源文件路徑是Linux下的路徑,如果在 windows 下測試,需要改寫為Windows下的路徑,比如D://hadoop/djt/weibo.txt
11 Path srcPath = new Path(srcFile);
12
13 // 目的路徑
14 String hdfsUri = HDFSUri;
15 if(StringUtils.isNotBlank(hdfsUri)){
16 destPath = hdfsUri + destPath;
17 }
18 Path dstPath = new Path(destPath);
19
20 // 實現文件上傳
21 try {
22 // 獲取FileSystem對象
23 FileSystem fs = getFileSystem();
24 fs.copyFromLocalFile(srcPath, dstPath);
25 fs.copyFromLocalFile(delSrc,overwrite,srcPath, dstPath);
26 //釋放資源
27 fs.close();
28 } catch (IOException e) {
29 logger.error("", e);
30 }
31 }
5、從 HDFS 下載文件
1 /**
2 * 從 HDFS 下載文件
3 *
4 * @param srcFile
5 * @param destPath
6 */
7 public static void getFile(String srcFile,String destPath) {
8 // 源文件路徑
9 String hdfsUri = HDFSUri;
10 if(StringUtils.isNotBlank(hdfsUri)){
11 srcFile = hdfsUri + srcFile;
12 }
13 Path srcPath = new Path(srcFile);
14
15 // 目的路徑是Linux下的路徑,如果在 windows 下測試,需要改寫為Windows下的路徑,比如D://hadoop/djt/
16 Path dstPath = new Path(destPath);
17
18 try {
19 // 獲取FileSystem對象
20 FileSystem fs = getFileSystem();
21 // 下載hdfs上的文件
22 fs.copyToLocalFile(srcPath, dstPath);
23 // 釋放資源
24 fs.close();
25 } catch (IOException e) {
26 logger.error("", e);
27 }
28 }
6、獲取 HDFS 集群節點信息
1 /**
2 * 獲取 HDFS 集群節點信息
3 *
4 * @return DatanodeInfo[]
5 */
6 public static DatanodeInfo[] getHDFSNodes() {
7 // 獲取所有節點
8 DatanodeInfo[] dataNodeStats = new DatanodeInfo[0];
9
10 try {
11 // 返回FileSystem對象
12 FileSystem fs = getFileSystem();
13
14 // 獲取分布式文件系統
15 DistributedFileSystem hdfs = (DistributedFileSystem)fs;
16
17 dataNodeStats = hdfs.getDataNodeStats();
18 } catch (IOException e) {
19 logger.error("", e);
20 }
21 return dataNodeStats;
22 }
7、查找某個文件在 HDFS集群的位置
1 /**
2 * 查找某個文件在 HDFS集群的位置
3 *
4 * @param filePath
5 * @return BlockLocation[]
6 */
7 public static BlockLocation[] getFileBlockLocations(String filePath) {
8 // 文件路徑
9 String hdfsUri = HDFSUri;
10 if(StringUtils.isNotBlank(hdfsUri)){
11 filePath = hdfsUri + filePath;
12 }
13 Path path = new Path(filePath);
14
15 // 文件塊位置列表
16 BlockLocation[] blkLocations = new BlockLocation[0];
17 try {
18 // 返回FileSystem對象
19 FileSystem fs = getFileSystem();
20 // 獲取文件目錄
21 FileStatus filestatus = fs.getFileStatus(path);
22 //獲取文件塊位置列表
23 blkLocations = fs.getFileBlockLocations(filestatus, 0, filestatus.getLen());
24 } catch (IOException e) {
25 logger.error("", e);
26 }
27 return blkLocations;
28 }
8、文件重命名
1 /**
2 * 文件重命名
3 *
4 * @param srcPath
5 * @param dstPath
6 */
7 public boolean rename(String srcPath, String dstPath){
8 boolean flag = false;
9 try {
10 // 返回FileSystem對象
11 FileSystem fs = getFileSystem();
12
13 String hdfsUri = HDFSUri;
14 if(StringUtils.isNotBlank(hdfsUri)){
15 srcPath = hdfsUri + srcPath;
16 dstPath = hdfsUri + dstPath;
17 }
18
19 flag = fs.rename(new Path(srcPath), new Path(dstPath));
20 } catch (IOException e) {
21 logger.error("{} rename to {} error.", srcPath, dstPath);
22 }
23
24 return flag;
25 }
9、判斷目錄是否存在
1 /**
2 * 判斷目錄是否存在
3 *
4 * @param srcPath
5 * @param dstPath
6 */
7 public boolean existDir(String filePath, boolean create){
8 boolean flag = false;
9
10 if (StringUtils.isEmpty(filePath)){
11 return flag;
12 }
13
14 try{
15 Path path = new Path(filePath);
16 // FileSystem對象
17 FileSystem fs = getFileSystem();
18
19 if (create){
20 if (!fs.exists(path)){
21 fs.mkdirs(path);
22 }
23 }
24
25 if (fs.isDirectory(path)){
26 flag = true;
27 }
28 }catch (Exception e){
29 logger.error("", e);
30 }
31
32 return flag;
33 }

