背景:hdfs上的文件最好和hdfs的塊大小的N倍。如果文件太小,浪費namnode的元數據存儲空間以及內存,如果文件分塊不合理也會影響mapreduce中map的效率。
本例中將小文件的文件名作為key,其內容作為value生成SequenceFile
1、生成文件
//將目標目錄的所有文件以文件名為key,內容為value放入SequenceFile中 //第一個參數是需要打包的目錄,第二個參數生成的文件路徑和名稱 private static void combineToSequenceFile(String[] args) throws IOException { String sourceDir = args[0]; String destFile = args[1]; List<String> files = getFiles(sourceDir); Configuration conf = new Configuration(); FileSystem fs = FileSystem.get(conf); Path destPath = new Path(destFile); if (fs.exists(destPath)) { fs.delete(destPath, true); } FSDataInputStream in = null; Text key = new Text(); BytesWritable value = new BytesWritable(); byte[] buff = new byte[4096]; SequenceFile.Writer writer = null; SequenceFile.Writer.Option option1 = SequenceFile.Writer.file(new Path(destFile)); SequenceFile.Writer.Option option2 = SequenceFile.Writer.keyClass(key.getClass()); SequenceFile.Writer.Option option3 = SequenceFile.Writer.valueClass(value.getClass()); SequenceFile.Writer.Option option4 = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD); try { writer = SequenceFile.createWriter(conf, option1, option2, option3, option4); for (int i = 0; i < files.size(); i++) { Path path = new Path(files.get(i).toString()); System.out.println("讀取文件:" + path.toString()); key = new Text(files.get(i).toString()); in = fs.open(path); // 只能處理小文件,int最大只能表示到1個G的大小,實際上大文件放入SequenceFile也沒有意義 int length = (int) fs.getFileStatus(path).getLen(); byte[] bytes = new byte[length]; // read最多只能讀取65536的大小 int readLength = in.read(buff); int offset = 0; while (readLength > 0) { System.arraycopy(buff, 0, bytes, offset, readLength); offset += readLength; readLength = in.read(buff); } System.out.println("file length:" + length + ",read length:" + offset); value = new BytesWritable(bytes); System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value.getLength()); writer.append(key, value); } } finally { IOUtils.closeStream(in); IOUtils.closeStream(writer); IOUtils.closeStream(fs); } }
查找文件:
private static List<String> getFiles(String dir) throws IOException { Configuration conf = new Configuration(); Path path = new Path(dir); FileSystem fs = null; List<String> filelist = new ArrayList<>(); try { fs = FileSystem.get(conf); //對單個文件或目錄下所有文件和目錄 FileStatus[] fileStatuses = fs.listStatus(path); for (FileStatus fileStatus : fileStatuses) { //遞歸查找子目錄 if (fileStatus.isDirectory()) { filelist.addAll(getFiles(fileStatus.getPath().toString())); } else { filelist.add(fileStatus.getPath().toString()); } } return filelist; } finally { IOUtils.closeStream(fs); } }
2、還原壓縮的SequenceFile文件
//將combineToSequenceFile生成的文件分解成原文件。 private static void extractCombineSequenceFile(String[] args) throws IOException { String sourceFile = args[0]; // String destdir = args[1]; Configuration conf = new Configuration(); Path sourcePath = new Path(sourceFile); SequenceFile.Reader reader = null; SequenceFile.Reader.Option option1 = SequenceFile.Reader.file(sourcePath); Writable key = null; Writable value = null; // Text key = null; // BytesWritable value = null; FileSystem fs = FileSystem.get(conf); try { reader = new SequenceFile.Reader(conf, option1); key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf); value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf); //在知道key和value的明確類型的情況下,可以直接用其類型 // key = ReflectionUtils.newInstance(Text.class, conf); // value = ReflectionUtils.newInstance(BytesWritable.class, conf); long position = reader.getPosition(); while (reader.next(key, value)) { FSDataOutputStream out = fs.create(new Path(key.toString()), true); //文件頭會多出4個字節,用來標識長度,而本例中原文件頭是沒有長度的,所以不能用這個方式寫入流 // value.write(out); out.write(((BytesWritable)value).getBytes(),0,((BytesWritable)value).getLength()); // out.write(value.getBytes(),0,value.getLength()); System.out.printf("[%s]\t%s\t%s\n", position, key, out.getPos()); out.close(); position = reader.getPosition(); } } finally { IOUtils.closeStream(reader); IOUtils.closeStream(fs); } }
