實現的代碼如下:
1 import java.io.IOException; 2 import java.net.URI; 3 import java.net.URISyntaxException; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.fs.FSDataInputStream; 7 import org.apache.hadoop.fs.FSDataOutputStream; 8 import org.apache.hadoop.fs.FileStatus; 9 import org.apache.hadoop.fs.FileSystem; 10 import org.apache.hadoop.fs.FileUtil; 11 import org.apache.hadoop.fs.Path; 12 import org.apache.hadoop.io.IOUtils; 13 14 15 public class HDFSTest { 16 17 //在指定位置新建一個文件,並寫入字符 18 public static void WriteToHDFS(String file, String words) throws IOException, URISyntaxException 19 { 20 Configuration conf = new Configuration(); 21 FileSystem fs = FileSystem.get(URI.create(file), conf); 22 Path path = new Path(file); 23 FSDataOutputStream out = fs.create(path); //創建文件 24 25 //兩個方法都用於文件寫入,好像一般多使用后者 26 out.writeBytes(words); 27 out.write(words.getBytes("UTF-8")); 28 29 out.close(); 30 //如果是要從輸入流中寫入,或是從一個文件寫到另一個文件(此時用輸入流打開已有內容的文件) 31 //可以使用如下IOUtils.copyBytes方法。 32 //FSDataInputStream in = fs.open(new Path(args[0])); 33 //IOUtils.copyBytes(in, out, 4096, true) //4096為一次復制塊大小,true表示復制完成后關閉流 34 } 35 36 public static void ReadFromHDFS(String file) throws IOException 37 { 38 Configuration conf = new Configuration(); 39 FileSystem fs = FileSystem.get(URI.create(file), conf); 40 Path path = new Path(file); 41 FSDataInputStream in = fs.open(path); 42 43 IOUtils.copyBytes(in, System.out, 4096, true); 44 //使用FSDataInoutStream的read方法會將文件內容讀取到字節流中並返回 45 /** 46 * FileStatus stat = fs.getFileStatus(path); 47 // create the buffer 48 byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; 49 is.readFully(0, buffer); 50 is.close(); 51 fs.close(); 52 return buffer; 53 */ 54 } 55 56 public static void DeleteHDFSFile(String file) throws IOException 57 { 58 Configuration conf = new Configuration(); 59 FileSystem fs = FileSystem.get(URI.create(file), conf); 60 Path path = new Path(file); 61 //查看fs的delete API可以看到三個方法。deleteonExit實在退出JVM時刪除,下面的方法是在指定為目錄是遞歸刪除 62 fs.delete(path,true); 63 fs.close(); 64 } 65 66 public static void UploadLocalFileHDFS(String src, String dst) throws IOException 67 { 68 Configuration conf = new Configuration(); 69 FileSystem fs = FileSystem.get(URI.create(dst), conf); 70 Path pathDst = new Path(dst); 71 Path pathSrc = new Path(src); 72 73 fs.copyFromLocalFile(pathSrc, pathDst); 74 fs.close(); 75 } 76 77 public static void ListDirAll(String DirFile) throws IOException 78 { 79 Configuration conf = new Configuration(); 80 FileSystem fs = FileSystem.get(URI.create(DirFile), conf); 81 Path path = new Path(DirFile); 82 83 FileStatus[] status = fs.listStatus(path); 84 //方法1 85 for(FileStatus f: status) 86 { 87 System.out.println(f.getPath().toString()); 88 } 89 //方法2 90 Path[] listedPaths = FileUtil.stat2Paths(status); 91 for (Path p : listedPaths){ 92 System.out.println(p.toString()); 93 } 94 } 95 96 public static void main(String [] args) throws IOException, URISyntaxException 97 { 98 //下面做的是顯示目錄下所有文件 99 ListDirAll("hdfs://ubuntu:9000/user/kqiao"); 100 101 String fileWrite = "hdfs://ubuntu:9000/user/kqiao/test/FileWrite"; 102 String words = "This words is to write into file!\n"; 103 WriteToHDFS(fileWrite, words); 104 //這里我們讀取fileWrite的內容並顯示在終端 105 ReadFromHDFS(fileWrite); 106 //這里刪除上面的fileWrite文件 107 DeleteHDFSFile(fileWrite); 108 //假設本地有一個uploadFile,這里上傳該文件到HDFS 109 // String LocalFile = "file:///home/kqiao/hadoop/MyHadoopCodes/uploadFile"; 110 // UploadLocalFileHDFS(LocalFile, fileWrite ); 111 } 112 }
FSDataOutputStream os = hdfs.create(new Path(args[0]));
注意:在os.flush() 刷新數據流;
有時寫入的文件不能立即被其他讀者看見,只有大於一個塊時其他讀者才能看見第一個塊,但還是不能看見當前塊。可以使用out.sync() 強制所有緩存與數據節點同步。其實在每一個os.close()中隱含了一個sync()的調用。
