全部程序如下:
import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; public class HDFSTest { //在指定位置新建一個文件,並寫入字符 public static void WriteToHDFS(String file, String words) throws IOException, URISyntaxException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(file), conf); Path path = new Path(file); FSDataOutputStream out = fs.create(path); //創建文件 //兩個方法都用於文件寫入,好像一般多使用后者 out.writeBytes(words); out.write(words.getBytes("UTF-8")); out.close(); //如果是要從輸入流中寫入,或是從一個文件寫到另一個文件(此時用輸入流打開已有內容的文件) //可以使用如下IOUtils.copyBytes方法。 //FSDataInputStream in = fs.open(new Path(args[0])); //IOUtils.copyBytes(in, out, 4096, true) //4096為一次復制塊大小,true表示復制完成后關閉流 } public static void ReadFromHDFS(String file) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(file), conf); Path path = new Path(file); FSDataInputStream in = fs.open(path); IOUtils.copyBytes(in, System.out, 4096, true); //使用FSDataInoutStream的read方法會將文件內容讀取到字節流中並返回 /** * FileStatus stat = fs.getFileStatus(path); // create the buffer byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))]; is.readFully(0, buffer); is.close(); fs.close(); return buffer; */ } public static void DeleteHDFSFile(String file) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(file), conf); Path path = new Path(file); //查看fs的delete API可以看到三個方法。deleteonExit實在退出JVM時刪除,下面的方法是在指定為目錄是遞歸刪除 fs.delete(path,true); fs.close(); } public static void UploadLocalFileHDFS(String src, String dst) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(dst), conf); Path pathDst = new Path(dst); Path pathSrc = new Path(src); fs.copyFromLocalFile(pathSrc, pathDst); fs.close(); } public static void ListDirAll(String DirFile) throws IOException { Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(URI.create(DirFile), conf); Path path = new Path(DirFile); FileStatus[] status = fs.listStatus(path); //方法1 for(FileStatus f: status) { System.out.println(f.getPath().toString()); } //方法2 Path[] listedPaths = FileUtil.stat2Paths(status); for (Path p : listedPaths){ System.out.println(p.toString()); } } public static void main(String [] args) throws IOException, URISyntaxException { //下面做的是顯示目錄下所有文件 ListDirAll("hdfs://ubuntu:9000/user/kqiao"); String fileWrite = "hdfs://ubuntu:9000/user/kqiao/test/FileWrite"; String words = "This words is to write into file!\n"; WriteToHDFS(fileWrite, words); //這里我們讀取fileWrite的內容並顯示在終端 ReadFromHDFS(fileWrite); //這里刪除上面的fileWrite文件 DeleteHDFSFile(fileWrite); //假設本地有一個uploadFile,這里上傳該文件到HDFS // String LocalFile = "file:///home/kqiao/hadoop/MyHadoopCodes/uploadFile"; // UploadLocalFileHDFS(LocalFile, fileWrite ); } }
FSDataOutputStream os = hdfs.create(new Path(args[0]));
注意:在os.flush() 刷新數據流;
有時寫入的文件不能立即被其他讀者看見,只有大於一個塊時其他讀者才能看見第一個塊,但還是不能看見當前塊。可以使用out.sync() 強制所有緩存與數據節點同步。其實在每一個os.close()中隱含了一個sync()的調用