hadoop 將HDFS上多個小文件合並到SequenceFile里


背景:hdfs上的文件最好和hdfs的塊大小的N倍。如果文件太小,浪費namnode的元數據存儲空間以及內存,如果文件分塊不合理也會影響mapreduce中map的效率。

本例中將小文件的文件名作為key,其內容作為value生成SequenceFile

1、生成文件

 //將目標目錄的所有文件以文件名為key,內容為value放入SequenceFile中
    //第一個參數是需要打包的目錄,第二個參數生成的文件路徑和名稱
    private static void combineToSequenceFile(String[] args) throws IOException {
        String sourceDir = args[0];
        String destFile = args[1];

        List<String> files = getFiles(sourceDir);

        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(conf);
        Path destPath = new Path(destFile);
        if (fs.exists(destPath)) {
            fs.delete(destPath, true);
        }

        FSDataInputStream in = null;

        Text key = new Text();
        BytesWritable value = new BytesWritable();

        byte[] buff = new byte[4096];
        SequenceFile.Writer writer = null;

        SequenceFile.Writer.Option option1 = SequenceFile.Writer.file(new Path(destFile));
        SequenceFile.Writer.Option option2 = SequenceFile.Writer.keyClass(key.getClass());
        SequenceFile.Writer.Option option3 = SequenceFile.Writer.valueClass(value.getClass());
        SequenceFile.Writer.Option option4 = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD);
        try {
            writer = SequenceFile.createWriter(conf, option1, option2, option3, option4);
            for (int i = 0; i < files.size(); i++) {
                Path path = new Path(files.get(i).toString());
                System.out.println("讀取文件:" + path.toString());
                key = new Text(files.get(i).toString());
                in = fs.open(path);
//                只能處理小文件,int最大只能表示到1個G的大小,實際上大文件放入SequenceFile也沒有意義
                int length = (int) fs.getFileStatus(path).getLen();
                byte[] bytes = new byte[length];
//                read最多只能讀取65536的大小
                int readLength = in.read(buff);
                int offset = 0;
                while (readLength > 0) {
                    System.arraycopy(buff, 0, bytes, offset, readLength);
                    offset += readLength;
                    readLength = in.read(buff);
                }
                System.out.println("file length:" + length + ",read length:" + offset);
                value = new BytesWritable(bytes);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key, value.getLength());
                writer.append(key, value);
            }
        } finally {
            IOUtils.closeStream(in);
            IOUtils.closeStream(writer);
            IOUtils.closeStream(fs);
        }

    }

查找文件:

    private static List<String> getFiles(String dir) throws IOException {
        Configuration conf = new Configuration();
        Path path = new Path(dir);
        FileSystem fs = null;
        List<String> filelist = new ArrayList<>();
        try {
            fs = FileSystem.get(conf);

            //對單個文件或目錄下所有文件和目錄
            FileStatus[] fileStatuses = fs.listStatus(path);

            for (FileStatus fileStatus : fileStatuses) {
                //遞歸查找子目錄
                if (fileStatus.isDirectory()) {
                    filelist.addAll(getFiles(fileStatus.getPath().toString()));
                } else {
                    filelist.add(fileStatus.getPath().toString());
                }
            }
            return filelist;
        } finally {
            IOUtils.closeStream(fs);
        }
    }

2、還原壓縮的SequenceFile文件

    //將combineToSequenceFile生成的文件分解成原文件。
    private static void extractCombineSequenceFile(String[] args) throws IOException {
        String sourceFile = args[0];
//        String destdir = args[1];
        Configuration conf = new Configuration();
        Path sourcePath = new Path(sourceFile);

        SequenceFile.Reader reader = null;
        SequenceFile.Reader.Option option1 = SequenceFile.Reader.file(sourcePath);

        Writable key = null;
        Writable value = null;
//        Text key = null;
//        BytesWritable value = null;

        FileSystem fs = FileSystem.get(conf);
        try {
            reader = new SequenceFile.Reader(conf, option1);
            key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
            value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);

            //在知道key和value的明確類型的情況下,可以直接用其類型
//            key = ReflectionUtils.newInstance(Text.class, conf);
//            value =  ReflectionUtils.newInstance(BytesWritable.class, conf);
            long position = reader.getPosition();
            while (reader.next(key, value)) {
                FSDataOutputStream out = fs.create(new Path(key.toString()), true);
                //文件頭會多出4個字節,用來標識長度,而本例中原文件頭是沒有長度的,所以不能用這個方式寫入流
//                value.write(out);
                out.write(((BytesWritable)value).getBytes(),0,((BytesWritable)value).getLength());

                //                out.write(value.getBytes(),0,value.getLength());
                System.out.printf("[%s]\t%s\t%s\n", position, key, out.getPos());
                out.close();
                position = reader.getPosition();
            }
        } finally {
            IOUtils.closeStream(reader);
            IOUtils.closeStream(fs);
        }
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM