最近好久沒有更新博客了,因為最近的工作鞋的代碼都是自己不怎么熟悉的領域的,所以感覺這些代碼寫的有點困難。今天特此寫這個博客把自己最近研究的東西稍作總結。
工作的需求是,在HDFS上有每天不斷產生的的日志文件文件夾,每一個文件夾下都有兩個文件,一個是.log文件,還有一個是.out文件。現在要求根據日志產生的時間,按照天計算,將同一天產生的文件夾打包壓縮成一個文件歸檔在 HDFS某個特定的目錄下。操作HDFS上的文件當然就不能java自帶的那一套操作文件的方式去處理了。事實上,Hadoop提供了一套api為java來操作hdfs,下面我們來看看具體是怎么實現的。
第一步,獲取HDFS的FileSystem對象,對所有在HDFS中的文件的操作,都要使用這個對象提供的方法去操作,需要添加當前項目下的hadoop的兩個配置文件。
1 public static FileSystem getFileSystem() throws IOException { 2 Configuration conf = new Configuration(); 3 conf.addResource(new Path("conf/core-site.xml")); 4 conf.addResource(new Path("conf/hdfs-site.xml")); 5 conf.set("fs.file.impl", "org.apache.hadoop.fs.RawLocalFileSystem"); 6 return FileSystem.get(conf); 7 }
第二步,獲取HDFS某文件夾下所有的文件信息:
1 FileStatus[] stats = fs.listStatus(path); 2 for (FileStatus stat : stats) { 3 long time = stat.getModificationTime(); //文件的最后編輯時間 4 String filePath = stat.getPath().toString(); //文件的全路徑 5 String fileName = stat.getPath().getName(); //文件名 6 boolean isFile = stat.isFile(); //路徑是否是文件 7 boolean isDir = stat.isDirectory(); //路徑是否是目錄 8 }
第三步,得到某一個文件的輸入流 ,使用hdfs提供的流:
1 FSDataInputStream HDFSIn = fs.open(new Path(fileStatus[j].getPath().toString())); //getPath():文件的完整路徑 2 BufferedInputStream bis = new BufferedInputStream(HDFSIn);
第四步,在文件經過相應的處理之后,向HDFS寫文件:
1 byte[] buf = new byte[1024]; 2 FSDataOutputStream fsDataOutputStream = getFileSystem().create(new Path(“HDFDPath”)); 3 fsDataOutputStream.write(buf);
第五步,刪除沒用的文件(謹慎操作!):
1 FileSystem fs = getFileSystem(); 2 fs.delete(new Path(file), true); //true表示非空目錄和文件會被刪除,通過java刪除的文件將會被永久刪除,不會被放入HDFS的回收站
作此總結,希望自己再接再厲。
附上HDFS常用命令:
命令 | 作用 |
|
列出hdfs文件系統根目錄下的目錄和文件 |
hadoop fs -put < local file > < hdfs file > |
向HDFS添加本地文件 |
hadoop fs -get < hdfs file > < local file or dir> |
拷貝文件至本地 |
|
刪除文件或者目錄 |
hadoop fs -mkdir < hdfs path> |
創建目錄 |
hadoop fs -cp < hdfs file > < hdfs file > |
給文件重命名並且保存,源文件還在 |
hadoop fs -mv < hdfs file > < hdfs file > |
給文件重命名並且保存,源文件不在 |
hadoop fs -count < hdfs path > |
統計目錄下文件個數,目錄個數,文件總大小 |
hadoop fs -tail < hdfs file > |
顯示文件末尾1KB的數據 |
|
均衡內部dataNode中的文件 |