分布式文件系統HDFS中對文件/目錄的相關操作代碼,整理了一下,大概包括以下部分:
- 文件夾的新建、刪除、重命名
- 文件夾中子文件和目錄的統計
- 文件的新建及顯示文件內容
- 文件在local和remote間的相互復制
- 定位文件在HDFS中的位置,以及副本存放的主機
- HDFS資源使用情況
1. 新建文件夾
public void mkdirs(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); if (!fs.exists(path)) { fs.mkdirs(path); System.out.println("Create: " + folder); } fs.close(); }
2. 刪除文件夾
public void rmr(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.deleteOnExit(path); System.out.println("Delete: " + folder); fs.close(); }
3. 文件重命名
public void rename(String src, String dst) throws IOException { Path name1 = new Path(src); Path name2 = new Path(dst); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.rename(name1, name2); System.out.println("Rename: from " + src + " to " + dst); fs.close(); }
4. 列出文件夾中的子文件及目錄
public void ls(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); System.out.println("ls: " + folder); System.out.println("=========================================================="); for (FileStatus f : list) { System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDirectory(), f.getLen()); } System.out.println("=========================================================="); fs.close(); }
5. 創建文件,並添加內容
public void createFile(String file, String content) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); byte[] buff = content.getBytes(); FSDataOutputStream os = null; try { os = fs.create(new Path(file)); os.write(buff, 0, buff.length); System.out.println("Create: " + file); } finally { if (os != null) os.close(); } fs.close(); }
6. 將local數據復制到remote
public void copyFile(String local, String remote) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.copyFromLocalFile(new Path(local), new Path(remote)); System.out.println("copy from: " + local + " to " + remote); fs.close(); }
7. 將remote數據下載到local
public void download(String remote, String local) throws IOException { Path path = new Path(remote); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.copyToLocalFile(path, new Path(local)); System.out.println("download: from" + remote + " to " + local); fs.close(); }
8. 顯示文件內容
public String cat(String remoteFile) throws IOException { Path path = new Path(remoteFile); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FSDataInputStream fsdis = null; System.out.println("cat: " + remoteFile); OutputStream baos = new ByteArrayOutputStream(); String str = null; try { fsdis = fs.open(path); IOUtils.copyBytes(fsdis, baos, 4096, false); str = baos.toString(); } finally { IOUtils.closeStream(fsdis); fs.close(); } System.out.println(str); return str; }
9. 定位一個文件在HDFS中存儲的位置,以及多個副本存儲在集群哪些節點上
public void location() throws IOException { String folder = hdfsPath + "create/"; String file = "t2.txt"; FileSystem fs = FileSystem.get(URI.create(hdfsPath), new Configuration()); FileStatus f = fs.getFileStatus(new Path(folder + file)); BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen()); System.out.println("File Location: " + folder + file); for (BlockLocation bl : list) { String[] hosts = bl.getHosts(); for (String host : hosts) { System.out.println("host:" + host); } } fs.close(); }
10. 獲取HDFS集群存儲資源使用情況
public void getTotalCapacity() { try { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FsStatus fsStatus = fs.getStatus(); System.out.println("總容量:" + fsStatus.getCapacity()); System.out.println("使用容量:" + fsStatus.getUsed()); System.out.println("剩余容量:" + fsStatus.getRemaining()); } catch (IOException e) { e.printStackTrace(); } }
完整代碼

import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsStatus; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.JobConf; /* * HDFS工具類 * */ public class Hdfs { private static final String HDFS = "hdfs://10.20.14.47:8020/"; public Hdfs(Configuration conf) { this(HDFS, conf); } public Hdfs(String hdfs, Configuration conf) { this.hdfsPath = hdfs; this.conf = conf; } private String hdfsPath; private Configuration conf; public static void main(String[] args) throws IOException { JobConf conf = config(); Hdfs hdfs = new Hdfs(conf); hdfs.createFile("/create/t2.txt", "12"); hdfs.location(); } public static JobConf config() { JobConf conf = new JobConf(Hdfs.class); conf.setJobName("HdfsDAO"); conf.addResource("classpath:/hadoop/core-site.xml"); conf.addResource("classpath:/hadoop/hdfs-site.xml"); conf.addResource("classpath:/hadoop/mapred-site.xml"); return conf; } /* * 創建文件夾 */ public void mkdirs(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); if (!fs.exists(path)) { fs.mkdirs(path); System.out.println("Create: " + folder); } fs.close(); } /* * 刪除文件夾 */ public void rmr(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.deleteOnExit(path); System.out.println("Delete: " + folder); fs.close(); } /* * 文件重命名 */ public void rename(String src, String dst) throws IOException { Path name1 = new Path(src); Path name2 = new Path(dst); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.rename(name1, name2); System.out.println("Rename: from " + src + " to " + dst); fs.close(); } /* * 列出文件夾中的子文件及目錄 */ public void ls(String folder) throws IOException { Path path = new Path(folder); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FileStatus[] list = fs.listStatus(path); System.out.println("ls: " + folder); System.out.println("=========================================================="); for (FileStatus f : list) { System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDirectory(), f.getLen()); } System.out.println("=========================================================="); fs.close(); } /* * 創建文件,並添加內容 */ public void createFile(String file, String content) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); byte[] buff = content.getBytes(); FSDataOutputStream os = null; try { os = fs.create(new Path(file)); os.write(buff, 0, buff.length); System.out.println("Create: " + file); } finally { if (os != null) os.close(); } fs.close(); } /* * 將local的數據復制到remote */ public void copyFile(String local, String remote) throws IOException { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.copyFromLocalFile(new Path(local), new Path(remote)); System.out.println("copy from: " + local + " to " + remote); fs.close(); } /* * 將remote數據下載到local */ public void download(String remote, String local) throws IOException { Path path = new Path(remote); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); fs.copyToLocalFile(path, new Path(local)); System.out.println("download: from" + remote + " to " + local); fs.close(); } /* * 顯示文件內容 */ public String cat(String remoteFile) throws IOException { Path path = new Path(remoteFile); FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FSDataInputStream fsdis = null; System.out.println("cat: " + remoteFile); OutputStream baos = new ByteArrayOutputStream(); String str = null; try { fsdis = fs.open(path); IOUtils.copyBytes(fsdis, baos, 4096, false); str = baos.toString(); } finally { IOUtils.closeStream(fsdis); fs.close(); } System.out.println(str); return str; } /* * 定位一個文件在HDFS中存儲的位置,以及多個副本存儲在集群哪些節點上 */ public void location() throws IOException { String folder = hdfsPath + "create/"; String file = "t2.txt"; FileSystem fs = FileSystem.get(URI.create(hdfsPath), new Configuration()); FileStatus f = fs.getFileStatus(new Path(folder + file)); BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen()); System.out.println("File Location: " + folder + file); for (BlockLocation bl : list) { String[] hosts = bl.getHosts(); for (String host : hosts) { System.out.println("host:" + host); } } fs.close(); } /* * 獲取HDFS資源使用情況 */ public void getTotalCapacity() { try { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); FsStatus fsStatus = fs.getStatus(); System.out.println("總容量:" + fsStatus.getCapacity()); System.out.println("使用容量:" + fsStatus.getUsed()); System.out.println("剩余容量:" + fsStatus.getRemaining()); } catch (IOException e) { e.printStackTrace(); } } /* * 獲取某文件中包含的目錄數,文件數,及占用空間大小 */ public void getContentSummary(String path) { ContentSummary cs = null; try { FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf); cs = fs.getContentSummary(new Path(path)); } catch (Exception e) { e.printStackTrace(); } // 目錄數 Long directoryCount = cs.getDirectoryCount(); // 文件數 Long fileCount = cs.getFileCount(); // 占用空間 Long length = cs.getLength(); System.out.println("目錄數:" + directoryCount); System.out.println("文件數:" + fileCount); System.out.println("占用空間:" + length); } }