HDFS是一個分布式文件系統,既然是文件系統,就可以對其文件進行操作,比如說新建文件、刪除文件、讀取文件內容等操作。下面記錄一下使用JAVA API對HDFS中的文件進行操作的過程。
對分HDFS中的文件操作主要涉及一下幾個類:
Configuration類:該類的對象封轉了客戶端或者服務器的配置。
FileSystem類:該類的對象是一個文件系統對象,可以用該對象的一些方法來對文件進行操作。FileSystem fs = FileSystem.get(conf);通過FileSystem的靜態方法get獲得該對象。
FSDataInputStream和FSDataOutputStream:這兩個類是HDFS中的輸入輸出流。分別通過FileSystem的open方法和create方法獲得。
具體如何對文件操作清下下面例子:
1 package com.hdfs; 2 3 import java.io.FileInputStream; 4 import java.io.IOException; 5 import java.io.InputStream; 6 7 import org.apache.hadoop.conf.Configuration; 8 import org.apache.hadoop.fs.FSDataOutputStream; 9 import org.apache.hadoop.fs.FileStatus; 10 import org.apache.hadoop.fs.FileSystem; 11 import org.apache.hadoop.fs.Path; 12 import org.apache.hadoop.io.IOUtils; 13 14 public class HdfsTest { 15 16 //創建新文件 17 public static void createFile(String dst , byte[] contents) throws IOException{ 18 Configuration conf = new Configuration(); 19 FileSystem fs = FileSystem.get(conf); 20 Path dstPath = new Path(dst); //目標路徑 21 //打開一個輸出流 22 FSDataOutputStream outputStream = fs.create(dstPath); 23 outputStream.write(contents); 24 outputStream.close(); 25 fs.close(); 26 System.out.println("文件創建成功!"); 27 } 28 29 //上傳本地文件 30 public static void uploadFile(String src,String dst) throws IOException{ 31 Configuration conf = new Configuration(); 32 FileSystem fs = FileSystem.get(conf); 33 Path srcPath = new Path(src); //原路徑 34 Path dstPath = new Path(dst); //目標路徑 35 //調用文件系統的文件復制函數,前面參數是指是否刪除原文件,true為刪除,默認為false 36 fs.copyFromLocalFile(false,srcPath, dstPath); 37 38 //打印文件路徑 39 System.out.println("Upload to "+conf.get("fs.default.name")); 40 System.out.println("------------list files------------"+"\n"); 41 FileStatus [] fileStatus = fs.listStatus(dstPath); 42 for (FileStatus file : fileStatus) 43 { 44 System.out.println(file.getPath()); 45 } 46 fs.close(); 47 } 48 49 //文件重命名 50 public static void rename(String oldName,String newName) throws IOException{ 51 Configuration conf = new Configuration(); 52 FileSystem fs = FileSystem.get(conf); 53 Path oldPath = new Path(oldName); 54 Path newPath = new Path(newName); 55 boolean isok = fs.rename(oldPath, newPath); 56 if(isok){ 57 System.out.println("rename ok!"); 58 }else{ 59 System.out.println("rename failure"); 60 } 61 fs.close(); 62 } 63 //刪除文件 64 public static void delete(String filePath) throws IOException{ 65 Configuration conf = new Configuration(); 66 FileSystem fs = FileSystem.get(conf); 67 Path path = new Path(filePath); 68 boolean isok = fs.deleteOnExit(path); 69 if(isok){ 70 System.out.println("delete ok!"); 71 }else{ 72 System.out.println("delete failure"); 73 } 74 fs.close(); 75 } 76 77 //創建目錄 78 public static void mkdir(String path) throws IOException{ 79 Configuration conf = new Configuration(); 80 FileSystem fs = FileSystem.get(conf); 81 Path srcPath = new Path(path); 82 boolean isok = fs.mkdirs(srcPath); 83 if(isok){ 84 System.out.println("create dir ok!"); 85 }else{ 86 System.out.println("create dir failure"); 87 } 88 fs.close(); 89 } 90 91 //讀取文件的內容 92 public static void readFile(String filePath) throws IOException{ 93 Configuration conf = new Configuration(); 94 FileSystem fs = FileSystem.get(conf); 95 Path srcPath = new Path(filePath); 96 InputStream in = null; 97 try { 98 in = fs.open(srcPath); 99 IOUtils.copyBytes(in, System.out, 4096, false); //復制到標准輸出流 100 } finally { 101 IOUtils.closeStream(in); 102 } 103 } 104 105 106 public static void main(String[] args) throws IOException { 107 //測試上傳文件 108 //uploadFile("D:\\c.txt", "/user/hadoop/test/"); 109 //測試創建文件 110 /*byte[] contents = "hello world 世界你好\n".getBytes(); 111 createFile("/user/hadoop/test1/d.txt",contents);*/ 112 //測試重命名 113 //rename("/user/hadoop/test/d.txt", "/user/hadoop/test/dd.txt"); 114 //測試刪除文件 115 //delete("test/dd.txt"); //使用相對路徑 116 //delete("test1"); //刪除目錄 117 //測試新建目錄 118 //mkdir("test1"); 119 //測試讀取文件 120 readFile("test1/d.txt"); 121 } 122 123 }