HDFS的Java API操作


通過Java代碼操作HDFS集群

目錄

引言

Idea連接HDFS

第一步:引入HDFS依賴

第一種引入方式(jar包)

第二種引入方式(使用maven引用)

引入項目依賴的時候:

maven項目的幾個核心的生命周期:

第二步:idea操作

配置HDFS的訪問路徑

單元測試

HDFS的JavaAPI基本操作

Maven依賴准備

hdfs文件系統的API使用

操作HDFS:

連接HDFS文件系統------是必備操作(見二、idea連接HDFS)

對HDFS進行操作

遞歸獲取HDFS所有文件

FileSystem其他方法

IO流操作HDFS

利用IO流實現文件的上傳和下載

利用IO流實現定位文件讀取


引言

要想在Windows上操作HDFS,首先需要在Windows上安裝HDFS。由於Hadoop官網沒有提供Windows下載版本,所以需要對 Hadoop.tar.gz進行兩次解壓(推薦用7-zip軟件),解壓完成后添加相應環境變量:HADOOP_HOME、Path

 

Idea連接HDFS

第一步:引入HDFS依賴

第一種引入方式(jar包)

自己找jar包然后插入程序中(HDFS編程所需jar包都在Hadoop安裝目錄的share目錄下,此處將jar包歸類了三個文件夾

         導入到idea中:

 

第二種引入方式(使用maven引用)

maven項目創建后是如下結構:

          

                其中:

        src                main                 javaJava源代碼
                 resourceJava中的一些靜態紫竹院,如文件、圖片、HTML文件等
                test                 Java:專門用來編寫Java Junit單元測試代碼

引入項目依賴的時候:

gav....
  scope:引入的依賴jar包的一個作用范圍
  runtime:項目運行過程中也要使用
  test:項目在測試過程中才能去使用
  provided:項目在編譯時和運行時都起作用

maven項目的幾個核心的生命周期:

 clean:清楚上一次編譯的結果

compile:編譯源代碼
test:執行maven項目的test包下的單元測試代碼
package:如果test階段測試通過,那么將項目打包成對應的包
install:創建maven項目的時候也指定了當前項目的gav坐標,執行install之后會將項目放入本地的maven倉庫

 

第二步:idea操作

引入配置文件,指定我們在去連接HDFS的時候我們應該采取什么樣的配置:

import org.apache.hadoop.conf.Configuration;
 
 
Configuration conf = new Configuration();

 

  • 配置HDFS的訪問路徑

conf.set("fs.defaultFS", "hdfs://192.168.218.55:9000");
這個操作就是配置HDFS文件的core-site.xml

 

  • 根據配置項獲取文件系統
    import org.apache.hadoop.fs.FileSystem;
     
    FileSystem = FileSystem.get(conf);

     

 此時運行程序,控制台顯示

說明運行成功,此時可以通過fileSystem實現對HDFS的遠程操作。例如:

(1)創建一個目錄

Path pm = new Path("/test");
fileSystem.mkdirs(pm);

(2)創建一個文件

Path pf = new Path("/a.txt");
fileSystem.create(pf);

 Notes此處進行操作時有可能報的錯誤:

        這個錯誤產生的原因是:在 / 這個目錄下只有root用戶能進行 rwx 操作,而用本機的idea進行操作時使用的是Windows的當前用戶,故需要將該目錄設置為:所有用戶都可以進行 rwx 操作。 

        還可能產生一個錯誤:HADOOP_HOME and Hadoop.home.dir are unset. 這個錯誤是找不到 HADOOP_HOME的配置路徑,如果配置沒問題但是還是出錯的話(有可能是電腦版本型號的問題),可以直接在idea中配置好Hadoop所在目錄:

conf.set("hadoop.home.dir", "d:\\software\\hadoop");

 

單元測試

       在編寫Java代碼的時候,如果想要運行一個Java程序,那么必須創建一個main方法,比較麻煩,比如現在想要測試HDFS的JavaAPI的文件上傳 和文件下載 的功能,那如果使用main方法,我們需要創建兩個Java類,比較復雜,后期找的時候也比較麻煩。因此在Java中提供了一個工具-----Junit單元測試

       單元測試是屬於Java的一個測試方法,最直接的表現形式就是在一個Java文件中可以創建多個“main”方法,如果想要使用單元測試,那么必須引入單元測試的jar包

       單元測試最大的特點就是可以讓Java中的普通方法擁有main方法的權利。使用方法:在方法前加入注解:@Test / @Before / @After

@Test:注解就是給Java類的普通方法增加main方法的執行權限,並且在運行的時候只會運行當前的這個方法單元,在一個類中可以存在多個方法單元

@Before:此處補充一個概念:代碼塊:有一個特點,在執行構造器時先執行代碼塊。而@Before方法類似於Java中的代碼塊,@Test單元測試方法執行之前必須先執行@Before修飾的方法中的內容。通常做一些前提准備

@After在@Test修飾的單元測試代碼方法執行完成之后,會調用@After修飾的方法。通常做一些銷毀工作

 

HDFS的JavaAPI基本操作

Maven依賴准備

 

<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.8.5</hadoop.version>
</properties>
 
<!--引入單元測試需要的jar包-->
<dependencies>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.1</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.19</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
</dependencies>

 

 

 

hdfs文件系統的API使用

注意:這些類都在hadoop包下

  • Configuration:HDFS的配置相關類,等同於hadoop中的*-site.xml中的配置。配置文件在代碼中采用類似於map集合的key-value鍵值對的表現形式。配置文件當中配置的配置項執行的優先級要高於我們在Hadoop軟件中的配置
  • FileSystem:HDFS文件系統對應的Java類,可以通過這個類獲取HDFS上的任何文件和目錄

copyFromLocalFile

copyToLocalFile

moveFromLocalFile

moveToLocalFile

rename

delete

mkdirs

listFiles

listStatus

  • Path類:是HDFS上文件在Java中的一個抽象表示,類似於File類
  • FileStatus(LocatedFileStatus:里面包含文件的詳情信息(文件路徑、文件權限、文件用戶、文件修改時間、文件大小等)

操作HDFS:

  • 連接HDFS文件系統------是必備操作(見二、idea連接HDFS

  • 對HDFS進行操作

    • FileSystem
      • 文件上傳
              命令:moveFromLocalcopyFromLocalput
              javaAPI
        copyFromLocalFile(Path, Path):復制文件
                               moveFromLocalFile(Path, Path):剪切文件
      • 文件下載
              命令:copyToLocalmoveToLocalget
              JavaAPI
        copyToLocalFile(Path, Path)
                                moveToLocalFile(Path, Path)
      • 文件刪除
              命令:-rm -r -rmdir
              JavaAPI
        delete(Path, boolean)
      • 創建文件夾
              命令:mkdir   -p
              JavaAPI
        mkdirs(Path)
      • 更改文件名
              命令:mv 文件路徑 更改重命名的文件路徑
              JavaAPIrename(Path, Path)
      • 查詢某一路徑下的文件,可以遞歸:listFiles
      • 查詢某一路徑下文件或文件夾,不可以遞歸:listStatus(常用)
    • FileStatus(LocatedFileStatus):里面包含了文件的詳情信息:文件路徑、文件權限、文件用戶、文件修改時間、文件大小....
    • 案例:
public class HDFSTestDemo {
    private FileSystem fs;
    @Before
    public void init() {
        // hdfs的連接配置
        Configuration conf = new Configuration();
        // 配置hdfs副本數
        conf.set("dfs.replication", "1");
        // 配置hdfs的地址----就是NameNode的地址,NameNode的地址在core-site.xml文件中配置的fs.defaultFS
//        conf.set("fs.defaultFS", "hdfs://192.168.218.55:9000");
        // 獲取文件系統
        try {
//            fs = FileSystem.get(conf);
            fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), conf, "root");
            System.out.println(fs);
            System.out.println("文件系統獲取成功");
        } catch (IOException | URISyntaxException | InterruptedException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 文件上傳測試
     */
    @Test
    public void upload() {
        Path localSrc = new Path("C:\\Users\\15336\\Desktop\\mm.txt");
        Path linuxDe = new Path("hdfs://192.168.218.55:9000/test");
        try {
            fs.copyFromLocalFile(localSrc, linuxDe);
            System.out.println("文件上傳成功");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 文件下載測試
     */
    @Test
    public void download() {
        Path localDst = new Path("E:\\");
        Path linuxSrc = new Path("hdfs://192.168.218.55:9000/test/mm.txt");
        try {
            fs.copyToLocalFile(linuxSrc, localDst);
            System.out.println("文件下載成功");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
 
    /**
     * 創建目錄
     */
    @Test
    public void mkdir() throws IOException {
        boolean mkdirs = fs.mkdirs(new Path("hdfs://192.168.218.55:9000/test/school"));
        if (mkdirs) {
            System.out.println("目錄創建成功");
        }
    }
 
    /**
     * 刪除目錄
     */
    @Test
    public void delete() throws IOException {
        /**
         * 需要傳入兩個參數
         * @params1: 目錄路徑
         * @params2: boolean類型的值,代表是否遞歸刪除(如果要刪除多層目錄,則為true)
         */
        boolean delete = fs.delete(new Path("hdfs://192.168.218.55:9000/school"), false);
        if (delete) {
            System.out.println("刪除成功");
        }
    }
 
    /**
     * 重命名或更改文件目錄
     */
    @Test
    public void rename() throws IOException {
        boolean rename = fs.rename(new Path("hdfs://192.168.218.55:9000/test/mm.txt"), new Path("hdfs://192.168.218.55:9000/test/vv.txt"));
        if (rename) {
            System.out.println("重命名成功");
        }
    }
 
    /**
     * 查看HDFS文件系統上有哪些文件和文件夾
     */
    @Test
    public  void listFiles() throws IOException {
        /**
         * listFiles 只是查詢文件的,目錄不能查詢
         *      Path:查詢的是哪一個路徑
         *      boolean:如果這個路徑下有一個子文件夾,那么是否把這個子文件夾也遍歷查詢一下,true是遍歷查詢,false不是
         *
         *
         * RemoteIterator接口是一個迭代器(有我們查詢出來的所有文件信息),它有兩個方法:
         *      hasNext():判斷有沒有下一個文件
         *      next():獲取下一個文件
         */
        RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("hdfs://192.168.218.55:9000/test"), true);
        while (files.hasNext()) {
            LocatedFileStatus next = files.next();
            System.out.println(next.getPath());
        }
    }
 
    /**
     * 查詢hdfs文件系統上的某一文件夾下有哪些文件和文件夾,不能遞歸
     */
    @Test
    public void listAll() throws IOException {
        /**
         * listStatus(Path) 代表的是查看當前路徑下有哪些文件和文件夾
         * 返回值:FileStatus[] 文件狀態的一個數組 FileStatus 就代表一個文件的狀態:是文件還是文件夾、路徑、權限等等
         */
        FileStatus[] fileStatuses = fs.listStatus(new Path("hdfs://192.168.218.55:9000/test"));
        for (FileStatus: fileStatuses) {
            /**
             * fileStatus是一個文件狀態對象,里面有很多方法去獲取文件的相關信息
             */
            System.out.println("文件路徑:" + fileStatus.getPath());
            System.out.println("文件權限:" + fileStatus.getPermission());
            System.out.println("文件所屬用戶:" + fileStatus.getOwner());
            System.out.println("文件所屬組:" + fileStatus.getGroup());
            System.out.println("文件大小:" + fileStatus.getLen());
            System.out.println("文件修改時間:" + fileStatus.getModificationTime());
            System.out.println("文件副本數:" + fileStatus.getReplication());
 
            System.out.println("判斷是不是一個文件" + fileStatus.isFile());
            System.out.println("判斷是不是一個目錄" + fileStatus.isDirectory());
        }
    }
 
    /**
     * 銷毀資源
     */
    @After
    public void destroy() {
        if (fs != null) {
            try {
                fs.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

 

遞歸獲取HDFS所有文件

遍歷某一個路徑下的所有文件和文件夾
*      遞歸:
*      如果一個文件夾下有文件的話,那么文件的權限直接打印
*      如果是一個文件夾,那么對這個文件夾遍歷

public class ListPathAllFile {
    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
        // hadoop的配置文件,執行的優先級最高
        Configuration conf = new Configuration();
        // 連接文件系統
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), conf, "root");
        Path p = new Path("/");
        /**
         * 以前判斷一個文件是file還是目錄,我們使用的是FileStatus里面的isFile() / isDirectory()
         * FileSystem中也提供了兩個方法,用來判斷是文件還是文件夾
         *      isFile(Path path)
         *      isDirectory(Path path)
         */
        listStatus(fs, p);
    }
 
    public static void listStatus(FileSystem fs, Path path) throws IOException {
        if (fs.isFile(path)) {
            System.out.println("有一個文件是:" + path.toString());
        } else if (fs.isDirectory(path)) {
            System.out.println("有一個目錄:" + path.toString());
            FileStatus[] fileStatuses = fs.listStatus(path);
            for (FileStatus fss: fileStatuses) {
                Path path1 = fss.getPath();
                listStatus(fs, path1);
            }
        }
    }
}

 

FileSystem其他方法

  1. createNewFile(Path)創建一個文件
  2. create(Path):根據HDFS上的一個文件file創建一個文件輸出流,然后就可以通過IO流的方式將數據傳輸到本地
  3. isFile(Path)判斷一個文件是不是file
  4. isDirectory(Path)判斷一個文件是不是目錄(文件夾)
  5. exists(Path)查看路徑是否存在
  6. getFileStatus(Path))查看某個文件或者文件夾的狀態
  7. createNewFile(Path)創建一個文件

IO流操作HDFS

利用IO流實現文件的上傳和下載

  • 使用IO流完成文件的上傳

        FileSystem

       create(Path, boolean):根據HDFS文件創建一個文件輸出流,可以實現一個流的數據輸出到HDFS這個文件系統中   

public void testUpload() throws IOException {
        FileInputStream fis = new FileInputStream(new File("E:\\college_data.txt"));
        // 如果想要通過IO流實現文件的上傳,指定overwrite為false,那么hdfs上的文件路徑不能提前存在,否則報錯
        // 如果指定為true(即追加),則不報錯
            FSDataOutputStream = fs.create(new Path("/test/college_data.txt"), false);
        IOUtils.copyBytes(fis, fsDataOutputStream, 1024);
        System.out.println("文件上傳完成");
  }

 

  • 使用IO流完成文件的下載
FileSystemopen():用來返回一個文件對應的輸入流
public void testIODownload() throws IOException {
    FSDataInputStream fis = fs.open(new Path("/test/d.txt"));
    FileOutputStream fos = new FileOutputStream("E:\\b.txt");
    IOUtils.copyBytes(fis, fos, 1024);
    System.out.println("文件下載成功");
}
 
         

利用IO流實現定位文件讀取

NotesHDFS存儲的文件都是以block塊存儲的,一個block塊默認是128M。假設有一個200M的文件上傳到HDFS上,有兩個block快,一個是128M,一個是72M(應該有128M,實際占用了72M

使用IO流實現文件的定位讀取意思就是比如一個文件分成了兩個block快,那么我先下載第一個block塊,之后再下載第二個block塊

public class IOHDFSTest2 {
    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {
        /**
         * 使用IO流完成文件的定位下載
         * 比如一個文件分成了兩個block快,那么我先下載第一個block塊,之后再下載第二個block塊
         */
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), conf, "root");
        FSDataInputStream open = fs.open(new Path("/test/hadoop-2.8.5.tar.gz"));
        /**
         * 下載第一個block塊:128M
         */
        FileOutputStream fos = new FileOutputStream("E:\\part1.tar.gz");
        byte[] buf = new byte[1024];
        for (int i = 0; i < 128 * 1024; i++) {
            open.read(buf);
            fos.write(buf);
 
        }
 
        /**
         * 下載第二個block塊:從128M開始
         */
        FileOutputStream fos1 = new FileOutputStream("E:\\part2.tar.gz");
        /**
         * 定位到128M的位置,定位之后只用IO流開始下載
         */
        // 定位偏移量
        open.seek(128 * 1024 * 1024);
        IOUtils.copyBytes(open, fos1, 1024);
    }
}

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM