HDFS文件目錄操作代碼


分布式文件系統HDFS中對文件/目錄的相關操作代碼,整理了一下,大概包括以下部分:

  • 文件夾的新建、刪除、重命名
  • 文件夾中子文件和目錄的統計
  • 文件的新建及顯示文件內容
  • 文件在local和remote間的相互復制
  • 定位文件在HDFS中的位置,以及副本存放的主機
  • HDFS資源使用情況

1. 新建文件夾

public void mkdirs(String folder) throws IOException {
    Path path = new Path(folder);
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    if (!fs.exists(path)) {
        fs.mkdirs(path);
        System.out.println("Create: " + folder);
    }
    fs.close();
}

 

2. 刪除文件夾

public void rmr(String folder) throws IOException {
    Path path = new Path(folder);
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    fs.deleteOnExit(path);
    System.out.println("Delete: " + folder);
    fs.close();
}

 

3. 文件重命名

public void rename(String src, String dst) throws IOException {
    Path name1 = new Path(src);
    Path name2 = new Path(dst);
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    fs.rename(name1, name2);
    System.out.println("Rename: from " + src + " to " + dst);
    fs.close();
}

 

4. 列出文件夾中的子文件及目錄

public void ls(String folder) throws IOException {
    Path path = new Path(folder);
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    FileStatus[] list = fs.listStatus(path);
    
    System.out.println("ls: " + folder);
    System.out.println("==========================================================");
    for (FileStatus f : list) {
        System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDirectory(), f.getLen());
    }
    System.out.println("==========================================================");
    fs.close();
}

 

5. 創建文件,並添加內容

public void createFile(String file, String content) throws IOException {
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    byte[] buff = content.getBytes();
    FSDataOutputStream os = null;
    try {
        os = fs.create(new Path(file));
        os.write(buff, 0, buff.length);
        System.out.println("Create: " + file);
    } finally {
        if (os != null)
            os.close();
    }
    fs.close();
}

 

6. 將local數據復制到remote

public void copyFile(String local, String remote) throws IOException {
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    fs.copyFromLocalFile(new Path(local), new Path(remote));
    System.out.println("copy from: " + local + " to " + remote);
    fs.close();
}

 

7. 將remote數據下載到local

public void download(String remote, String local) throws IOException {
    Path path = new Path(remote);
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    fs.copyToLocalFile(path, new Path(local));
    System.out.println("download: from" + remote + " to " + local);
    fs.close();
}

 

8. 顯示文件內容

    public String cat(String remoteFile) throws IOException {
        Path path = new Path(remoteFile);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FSDataInputStream fsdis = null;
        System.out.println("cat: " + remoteFile);

        OutputStream baos = new ByteArrayOutputStream();
        String str = null;

        try {
            fsdis = fs.open(path);
            IOUtils.copyBytes(fsdis, baos, 4096, false);
            str = baos.toString();
        } finally {
            IOUtils.closeStream(fsdis);
            fs.close();
        }
        System.out.println(str);
        return str;
    }

 

9. 定位一個文件在HDFS中存儲的位置,以及多個副本存儲在集群哪些節點上

public void location() throws IOException {
    String folder = hdfsPath + "create/";
    String file = "t2.txt";
    FileSystem fs = FileSystem.get(URI.create(hdfsPath), new Configuration());
    FileStatus f = fs.getFileStatus(new Path(folder + file));
    BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen());

    System.out.println("File Location: " + folder + file);
    for (BlockLocation bl : list) {
        String[] hosts = bl.getHosts();
        for (String host : hosts) {
            System.out.println("host:" + host);
        }
    }
    fs.close();
}

 

10. 獲取HDFS集群存儲資源使用情況

public void getTotalCapacity() {
    try {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FsStatus fsStatus = fs.getStatus();
        System.out.println("總容量:" + fsStatus.getCapacity());
        System.out.println("使用容量:" + fsStatus.getUsed());
        System.out.println("剩余容量:" + fsStatus.getRemaining());
    } catch (IOException e) {
        e.printStackTrace();
    }
}

 

完整代碼

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.JobConf;

/*
* HDFS工具類
* 
*/
public class Hdfs {

    private static final String HDFS = "hdfs://10.20.14.47:8020/";

    public Hdfs(Configuration conf) {
        this(HDFS, conf);
    }

    public Hdfs(String hdfs, Configuration conf) {
        this.hdfsPath = hdfs;
        this.conf = conf;
    }

    private String hdfsPath;
    private Configuration conf;

    public static void main(String[] args) throws IOException {
        JobConf conf = config();
        Hdfs hdfs = new Hdfs(conf);
        hdfs.createFile("/create/t2.txt", "12");
        hdfs.location();
    }

    public static JobConf config() {
        JobConf conf = new JobConf(Hdfs.class);
        conf.setJobName("HdfsDAO");
        conf.addResource("classpath:/hadoop/core-site.xml");
        conf.addResource("classpath:/hadoop/hdfs-site.xml");
        conf.addResource("classpath:/hadoop/mapred-site.xml");
        return conf;
    }
    
    /*
     * 創建文件夾
     */
    public void mkdirs(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        if (!fs.exists(path)) {
            fs.mkdirs(path);
            System.out.println("Create: " + folder);
        }
        fs.close();
    }

    /*
     * 刪除文件夾
     */
    public void rmr(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.deleteOnExit(path);
        System.out.println("Delete: " + folder);
        fs.close();
    }

    /*
     * 文件重命名
     */
    public void rename(String src, String dst) throws IOException {
        Path name1 = new Path(src);
        Path name2 = new Path(dst);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.rename(name1, name2);
        System.out.println("Rename: from " + src + " to " + dst);
        fs.close();
    }

    /*
     * 列出文件夾中的子文件及目錄
     */
    public void ls(String folder) throws IOException {
        Path path = new Path(folder);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FileStatus[] list = fs.listStatus(path);
        
        System.out.println("ls: " + folder);
        System.out.println("==========================================================");
        for (FileStatus f : list) {
            System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDirectory(), f.getLen());
        }
        System.out.println("==========================================================");
        fs.close();
    }

    /*
     * 創建文件,並添加內容
     */
    public void createFile(String file, String content) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        byte[] buff = content.getBytes();
        FSDataOutputStream os = null;
        try {
            os = fs.create(new Path(file));
            os.write(buff, 0, buff.length);
            System.out.println("Create: " + file);
        } finally {
            if (os != null)
                os.close();
        }
        fs.close();
    }

    /*
     * 將local的數據復制到remote
     */
    public void copyFile(String local, String remote) throws IOException {
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyFromLocalFile(new Path(local), new Path(remote));
        System.out.println("copy from: " + local + " to " + remote);
        fs.close();
    }

    /*
     * 將remote數據下載到local
     */
    public void download(String remote, String local) throws IOException {
        Path path = new Path(remote);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        fs.copyToLocalFile(path, new Path(local));
        System.out.println("download: from" + remote + " to " + local);
        fs.close();
    }

    /*
     * 顯示文件內容
     */
    public String cat(String remoteFile) throws IOException {
        Path path = new Path(remoteFile);
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
        FSDataInputStream fsdis = null;
        System.out.println("cat: " + remoteFile);

        OutputStream baos = new ByteArrayOutputStream();
        String str = null;

        try {
            fsdis = fs.open(path);
            IOUtils.copyBytes(fsdis, baos, 4096, false);
            str = baos.toString();
        } finally {
            IOUtils.closeStream(fsdis);
            fs.close();
        }
        System.out.println(str);
        return str;
    }

    /*
     * 定位一個文件在HDFS中存儲的位置,以及多個副本存儲在集群哪些節點上
     */
    public void location() throws IOException {
        String folder = hdfsPath + "create/";
        String file = "t2.txt";
        FileSystem fs = FileSystem.get(URI.create(hdfsPath), new Configuration());
        FileStatus f = fs.getFileStatus(new Path(folder + file));
        BlockLocation[] list = fs.getFileBlockLocations(f, 0, f.getLen());

        System.out.println("File Location: " + folder + file);
        for (BlockLocation bl : list) {
            String[] hosts = bl.getHosts();
            for (String host : hosts) {
                System.out.println("host:" + host);
            }
        }
        fs.close();
    }
    
    /*
     * 獲取HDFS資源使用情況
     */
    public void getTotalCapacity() {
        try {
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            FsStatus fsStatus = fs.getStatus();
            System.out.println("總容量:" + fsStatus.getCapacity());
            System.out.println("使用容量:" + fsStatus.getUsed());
            System.out.println("剩余容量:" + fsStatus.getRemaining());
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    
    /*
     * 獲取某文件中包含的目錄數,文件數,及占用空間大小
     */
    public void getContentSummary(String path) {
        ContentSummary cs = null;
        try {
            FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
            cs = fs.getContentSummary(new Path(path));
        } catch (Exception e) {
            e.printStackTrace();
        }

        // 目錄數
        Long directoryCount = cs.getDirectoryCount();
        // 文件數
        Long fileCount = cs.getFileCount();
        // 占用空間
        Long length = cs.getLength();
        
        System.out.println("目錄數:" + directoryCount);
        System.out.println("文件數:" + fileCount);
        System.out.println("占用空間:" + length);
    }
}
View Code

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM