通過Java代碼操作HDFS集群
目錄
連接HDFS文件系統------是必備操作(見二、idea連接HDFS)
引言
要想在Windows上操作HDFS,首先需要在Windows上安裝HDFS。由於Hadoop官網沒有提供Windows下載版本,所以需要對 Hadoop.tar.gz進行兩次解壓(推薦用7-zip軟件),解壓完成后添加相應環境變量:HADOOP_HOME、Path
Idea連接HDFS
第一步:引入HDFS依賴
第一種引入方式(jar包)
自己找jar包然后插入程序中(HDFS編程所需jar包都在Hadoop安裝目錄的share目錄下,此處將jar包歸類了三個文件夾)




導入到idea中:
第二種引入方式(使用maven引用)
maven項目創建后是如下結構:
其中:
src main java:Java源代碼
resource:Java中的一些靜態紫竹院,如文件、圖片、HTML文件等
test Java:專門用來編寫Java Junit單元測試代碼
引入項目依賴的時候:

gav....
scope:引入的依賴jar包的一個作用范圍
runtime:項目運行過程中也要使用
test:項目在測試過程中才能去使用
provided:項目在編譯時和運行時都起作用
maven項目的幾個核心的生命周期:

clean:清楚上一次編譯的結果
compile:編譯源代碼
test:執行maven項目的test包下的單元測試代碼
package:如果test階段測試通過,那么將項目打包成對應的包
install:創建maven項目的時候也指定了當前項目的gav坐標,執行install之后會將項目放入本地的maven倉庫
第二步:idea操作
引入配置文件,指定我們在去連接HDFS的時候我們應該采取什么樣的配置:
import org.apache.hadoop.conf.Configuration; Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.218.55:9000");
這個操作就是配置HDFS文件的core-site.xml

- 根據配置項獲取文件系統
import org.apache.hadoop.fs.FileSystem; FileSystem = FileSystem.get(conf);
此時運行程序,控制台顯示
![]()
說明運行成功,此時可以通過fileSystem實現對HDFS的遠程操作。例如:
(1)創建一個目錄
Path pm = new Path("/test"); fileSystem.mkdirs(pm);
(2)創建一個文件
Path pf = new Path("/a.txt"); fileSystem.create(pf);
Notes:此處進行操作時有可能報的錯誤:

這個錯誤產生的原因是:在 / 這個目錄下只有root用戶能進行 rwx 操作,而用本機的idea進行操作時使用的是Windows的當前用戶,故需要將該目錄設置為:所有用戶都可以進行 rwx 操作。
還可能產生一個錯誤:HADOOP_HOME and Hadoop.home.dir are unset. 這個錯誤是找不到 HADOOP_HOME的配置路徑,如果配置沒問題但是還是出錯的話(有可能是電腦版本型號的問題),可以直接在idea中配置好Hadoop所在目錄:
conf.set("hadoop.home.dir", "d:\\software\\hadoop");
單元測試
在編寫Java代碼的時候,如果想要運行一個Java程序,那么必須創建一個main方法,比較麻煩,比如現在想要測試HDFS的JavaAPI的文件上傳 和文件下載 的功能,那如果使用main方法,我們需要創建兩個Java類,比較復雜,后期找的時候也比較麻煩。因此在Java中提供了一個工具-----Junit單元測試
單元測試是屬於Java的一個測試方法,最直接的表現形式就是在一個Java文件中可以創建多個“main”方法,如果想要使用單元測試,那么必須引入單元測試的jar包
單元測試最大的特點就是可以讓Java中的普通方法擁有main方法的權利。使用方法:在方法前加入注解:@Test / @Before / @After
@Test:注解就是給Java類的普通方法增加main方法的執行權限,並且在運行的時候只會運行當前的這個方法單元,在一個類中可以存在多個方法單元
@Before:此處補充一個概念:代碼塊:有一個特點,在執行構造器時先執行代碼塊。而@Before方法類似於Java中的代碼塊,@Test單元測試方法執行之前必須先執行@Before修飾的方法中的內容。通常做一些前提准備
@After:在@Test修飾的單元測試代碼方法執行完成之后,會調用@After修飾的方法。通常做一些銷毀工作
HDFS的JavaAPI基本操作
Maven依賴准備
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <hadoop.version>2.8.5</hadoop.version> </properties> <!--引入單元測試需要的jar包--> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.13.1</version> <scope>compile</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.19</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hadoop.version}</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-client --> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>${hadoop.version}</version> </dependency> </dependencies>
hdfs文件系統的API使用
注意:這些類都在hadoop包下
- Configuration類:HDFS的配置相關類,等同於hadoop中的*-site.xml中的配置。配置文件在代碼中采用類似於map集合的key-value鍵值對的表現形式。配置文件當中配置的配置項執行的優先級要高於我們在Hadoop軟件中的配置
- FileSystem類:HDFS文件系統對應的Java類,可以通過這個類獲取HDFS上的任何文件和目錄
copyFromLocalFile
copyToLocalFile
moveFromLocalFile
moveToLocalFile
rename
delete
mkdirs
listFiles
listStatus
- Path類:是HDFS上文件在Java中的一個抽象表示,類似於File類
- FileStatus(LocatedFileStatus):里面包含文件的詳情信息(文件路徑、文件權限、文件用戶、文件修改時間、文件大小等)
操作HDFS:
-
連接HDFS文件系統------是必備操作(見二、idea連接HDFS)
-
對HDFS進行操作
- FileSystem
- 文件上傳
命令:moveFromLocal、copyFromLocal、put
javaAPI:copyFromLocalFile(Path, Path):復制文件
moveFromLocalFile(Path, Path):剪切文件 - 文件下載
命令:copyToLocal、moveToLocal、get
JavaAPI:copyToLocalFile(Path, Path)
moveToLocalFile(Path, Path) - 文件刪除
命令:-rm -r -rmdir
JavaAPI:delete(Path, boolean) - 創建文件夾
命令:mkdir -p
JavaAPI:mkdirs(Path) - 更改文件名
命令:mv 文件路徑 更改重命名的文件路徑
JavaAPI:rename(Path, Path) - 查詢某一路徑下的文件,可以遞歸:listFiles
- 查詢某一路徑下文件或文件夾,不可以遞歸:listStatus(常用)
- 文件上傳
- FileStatus(LocatedFileStatus):里面包含了文件的詳情信息:文件路徑、文件權限、文件用戶、文件修改時間、文件大小....
- 案例:
- FileSystem
public class HDFSTestDemo { private FileSystem fs; @Before public void init() { // hdfs的連接配置 Configuration conf = new Configuration(); // 配置hdfs副本數 conf.set("dfs.replication", "1"); // 配置hdfs的地址----就是NameNode的地址,NameNode的地址在core-site.xml文件中配置的fs.defaultFS // conf.set("fs.defaultFS", "hdfs://192.168.218.55:9000"); // 獲取文件系統 try { // fs = FileSystem.get(conf); fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), conf, "root"); System.out.println(fs); System.out.println("文件系統獲取成功"); } catch (IOException | URISyntaxException | InterruptedException e) { e.printStackTrace(); } } /** * 文件上傳測試 */ @Test public void upload() { Path localSrc = new Path("C:\\Users\\15336\\Desktop\\mm.txt"); Path linuxDe = new Path("hdfs://192.168.218.55:9000/test"); try { fs.copyFromLocalFile(localSrc, linuxDe); System.out.println("文件上傳成功"); } catch (IOException e) { e.printStackTrace(); } } /** * 文件下載測試 */ @Test public void download() { Path localDst = new Path("E:\\"); Path linuxSrc = new Path("hdfs://192.168.218.55:9000/test/mm.txt"); try { fs.copyToLocalFile(linuxSrc, localDst); System.out.println("文件下載成功"); } catch (IOException e) { e.printStackTrace(); } } /** * 創建目錄 */ @Test public void mkdir() throws IOException { boolean mkdirs = fs.mkdirs(new Path("hdfs://192.168.218.55:9000/test/school")); if (mkdirs) { System.out.println("目錄創建成功"); } } /** * 刪除目錄 */ @Test public void delete() throws IOException { /** * 需要傳入兩個參數 * @params1: 目錄路徑 * @params2: boolean類型的值,代表是否遞歸刪除(如果要刪除多層目錄,則為true) */ boolean delete = fs.delete(new Path("hdfs://192.168.218.55:9000/school"), false); if (delete) { System.out.println("刪除成功"); } } /** * 重命名或更改文件目錄 */ @Test public void rename() throws IOException { boolean rename = fs.rename(new Path("hdfs://192.168.218.55:9000/test/mm.txt"), new Path("hdfs://192.168.218.55:9000/test/vv.txt")); if (rename) { System.out.println("重命名成功"); } } /** * 查看HDFS文件系統上有哪些文件和文件夾 */ @Test public void listFiles() throws IOException { /** * listFiles 只是查詢文件的,目錄不能查詢 * Path:查詢的是哪一個路徑 * boolean:如果這個路徑下有一個子文件夾,那么是否把這個子文件夾也遍歷查詢一下,true是遍歷查詢,false不是 * * * RemoteIterator接口是一個迭代器(有我們查詢出來的所有文件信息),它有兩個方法: * hasNext():判斷有沒有下一個文件 * next():獲取下一個文件 */ RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("hdfs://192.168.218.55:9000/test"), true); while (files.hasNext()) { LocatedFileStatus next = files.next(); System.out.println(next.getPath()); } } /** * 查詢hdfs文件系統上的某一文件夾下有哪些文件和文件夾,不能遞歸 */ @Test public void listAll() throws IOException { /** * listStatus(Path) 代表的是查看當前路徑下有哪些文件和文件夾 * 返回值:FileStatus[] 文件狀態的一個數組 FileStatus 就代表一個文件的狀態:是文件還是文件夾、路徑、權限等等 */ FileStatus[] fileStatuses = fs.listStatus(new Path("hdfs://192.168.218.55:9000/test")); for (FileStatus: fileStatuses) { /** * fileStatus是一個文件狀態對象,里面有很多方法去獲取文件的相關信息 */ System.out.println("文件路徑:" + fileStatus.getPath()); System.out.println("文件權限:" + fileStatus.getPermission()); System.out.println("文件所屬用戶:" + fileStatus.getOwner()); System.out.println("文件所屬組:" + fileStatus.getGroup()); System.out.println("文件大小:" + fileStatus.getLen()); System.out.println("文件修改時間:" + fileStatus.getModificationTime()); System.out.println("文件副本數:" + fileStatus.getReplication()); System.out.println("判斷是不是一個文件" + fileStatus.isFile()); System.out.println("判斷是不是一個目錄" + fileStatus.isDirectory()); } } /** * 銷毀資源 */ @After public void destroy() { if (fs != null) { try { fs.close(); } catch (IOException e) { e.printStackTrace(); } } } }
遞歸獲取HDFS所有文件
遍歷某一個路徑下的所有文件和文件夾
* 遞歸:
* 如果一個文件夾下有文件的話,那么文件的權限直接打印
* 如果是一個文件夾,那么對這個文件夾遍歷
public class ListPathAllFile { public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException { // hadoop的配置文件,執行的優先級最高 Configuration conf = new Configuration(); // 連接文件系統 FileSystem fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), conf, "root"); Path p = new Path("/"); /** * 以前判斷一個文件是file還是目錄,我們使用的是FileStatus里面的isFile() / isDirectory() * FileSystem中也提供了兩個方法,用來判斷是文件還是文件夾 * isFile(Path path) * isDirectory(Path path) */ listStatus(fs, p); } public static void listStatus(FileSystem fs, Path path) throws IOException { if (fs.isFile(path)) { System.out.println("有一個文件是:" + path.toString()); } else if (fs.isDirectory(path)) { System.out.println("有一個目錄:" + path.toString()); FileStatus[] fileStatuses = fs.listStatus(path); for (FileStatus fss: fileStatuses) { Path path1 = fss.getPath(); listStatus(fs, path1); } } } }
FileSystem其他方法
- createNewFile(Path):創建一個文件
- create(Path):根據HDFS上的一個文件file創建一個文件輸出流,然后就可以通過IO流的方式將數據傳輸到本地
- isFile(Path):判斷一個文件是不是file
- isDirectory(Path):判斷一個文件是不是目錄(文件夾)
- exists(Path):查看路徑是否存在
- getFileStatus(Path)):查看某個文件或者文件夾的狀態
- createNewFile(Path):創建一個文件
IO流操作HDFS
利用IO流實現文件的上傳和下載
- 使用IO流完成文件的上傳
FileSystem
create(Path, boolean):根據HDFS文件創建一個文件輸出流,可以實現一個流的數據輸出到HDFS這個文件系統中
public void testUpload() throws IOException { FileInputStream fis = new FileInputStream(new File("E:\\college_data.txt")); // 如果想要通過IO流實現文件的上傳,指定overwrite為false,那么hdfs上的文件路徑不能提前存在,否則報錯 // 如果指定為true(即追加),則不報錯 FSDataOutputStream = fs.create(new Path("/test/college_data.txt"), false); IOUtils.copyBytes(fis, fsDataOutputStream, 1024); System.out.println("文件上傳完成"); }
- 使用IO流完成文件的下載
FileSystem:open():用來返回一個文件對應的輸入流
public void testIODownload() throws IOException { FSDataInputStream fis = fs.open(new Path("/test/d.txt")); FileOutputStream fos = new FileOutputStream("E:\\b.txt"); IOUtils.copyBytes(fis, fos, 1024); System.out.println("文件下載成功"); }
利用IO流實現定位文件讀取
Notes:HDFS存儲的文件都是以block塊存儲的,一個block塊默認是128M。假設有一個200M的文件上傳到HDFS上,有兩個block快,一個是128M,一個是72M(應該有128M,實際占用了72M)
使用IO流實現文件的定位讀取意思就是比如一個文件分成了兩個block快,那么我先下載第一個block塊,之后再下載第二個block塊
public class IOHDFSTest2 { public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException { /** * 使用IO流完成文件的定位下載 * 比如一個文件分成了兩個block快,那么我先下載第一個block塊,之后再下載第二個block塊 */ Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(new URI("hdfs://192.168.218.55:9000"), conf, "root"); FSDataInputStream open = fs.open(new Path("/test/hadoop-2.8.5.tar.gz")); /** * 下載第一個block塊:128M */ FileOutputStream fos = new FileOutputStream("E:\\part1.tar.gz"); byte[] buf = new byte[1024]; for (int i = 0; i < 128 * 1024; i++) { open.read(buf); fos.write(buf); } /** * 下載第二個block塊:從128M開始 */ FileOutputStream fos1 = new FileOutputStream("E:\\part2.tar.gz"); /** * 定位到128M的位置,定位之后只用IO流開始下載 */ // 定位偏移量 open.seek(128 * 1024 * 1024); IOUtils.copyBytes(open, fos1, 1024); } }
