Hadoop:讀取hdfs上zip壓縮包並解壓到hdfs的實現代碼


背景:

目前工作中遇到一大批的數據,如果不壓縮直接上傳到ftp上就會遇到ftp空間資源不足問題,沒辦法只能壓縮后上傳,上穿完成后在linux上下載。但是linux客戶端的資源只有20G左右一個壓縮包解壓后就要占用16G左右的空間,因此想在linux上直接解壓已經太折騰了(因為我們一共需要處理的這樣的壓縮包包含有30個左右)。

解決方案:

先把linux上下載到的zip壓縮包上傳到hdfs,等待所有zip壓縮包都上傳完成后,開始使用程序直接在讀取hdfs上的壓縮包文件,直接解壓到hdfs上,之后把解壓后的文件壓縮為gzip,實現代碼如下(參考:http://www.cnblogs.com/juefan/articles/2935163.html):

import java.io.File;
import java.io.IOException;
import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;

/**
 * Created by Administrator on 12/10/2017.
 */
public class ConvertHdfsZipFileToGzipFile {
    public static boolean isRecur = false;

    public static void main(String[] args) throws IOException {
        if (args.length == 0)
            errorMessage("1filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
        if (args[0].matches("^-[rR]$")) {
            isRecur = true;
        }
        if ((isRecur && args.length != 4) || ( !isRecur && args.length != 3)) {
            errorMessage("2filesmerge [-r|-R] <hdfsTargetDir> <hdfsFileName>");
        }

        Configuration conf = new Configuration();
        FileSystem hdfs = FileSystem.get(conf);

        Path inputDir;
        Path hdfsFile;
        Text pcgroupText;

        // hadoop jar myjar.jar ConvertHdfsZipFileToGzipFile -r /zip/(待轉換文件路徑,在HDFS上) /user/j/pconline/(轉換完成后的文件存儲地址,也在HDFS上) pconline(待轉換的文件名包含的字符)
        if(isRecur){
            inputDir = new Path(args[1]);
            hdfsFile = new Path(args[2]);
            pcgroupText = new Text(args[3]);
        }
        // hadoop jar myjar.jar ConvertHdfsZipFileToGzipFile /zip/(待轉換文件路徑,在HDFS上) /user/j/pconline/(轉換完成后的文件存儲地址,也在HDFS上) pconline(待轉換的文件名包含的字符)
        else{
            inputDir = new Path(args[0]);
            hdfsFile = new Path(args[1]);
            pcgroupText = new Text(args[2]);
        }

        if (!hdfs.exists(inputDir)) {
            errorMessage("3hdfsTargetDir not exist!");
        }
        if (hdfs.exists(hdfsFile)) {
            errorMessage("4hdfsFileName exist!");
        }
        merge(inputDir, hdfsFile, hdfs, pcgroupText);
        System.exit(0);
    }

    /**
     * @author 
     * @param inputDir zip文件的存儲地址
     * @param hdfsFile 解壓結果的存儲地址
     * @param hdfs 分布式文件系統數據流
     * @param pcgroupText 需要解壓縮的文件關鍵名
     */
    public static void merge(Path inputDir, Path hdfsFile,
                             FileSystem hdfs, Text pcgroupText) {
        try {
            //文件系統地址inputDir下的FileStatus
            FileStatus[] inputFiles = hdfs.listStatus(inputDir);
            for (int i = 0; i < inputFiles.length; i++) {
                if (!hdfs.isFile(inputFiles[i].getPath())) {
                    if (isRecur){
                        merge(inputFiles[i].getPath(), hdfsFile, hdfs,pcgroupText);
                        return ;
                    }
                    else {
                        System.out.println(inputFiles[i].getPath().getName()
                                + "is not file and not allow recursion, skip!");
                        continue;
                    }
                }
                //判斷文件名是否在需要解壓縮的關鍵名內
                if(inputFiles[i].getPath().getName().contains(pcgroupText.toString()) == true){
                    //輸出待解壓的文件名
                    System.out.println(inputFiles[i].getPath().getName());
                    //將數據流指向待解壓文件
                    FSDataInputStream in = hdfs.open(inputFiles[i].getPath());
                    /**
                     *數據的解壓執行過程
                     */
                    ZipInputStream zipInputStream = null;
                    try{
                        zipInputStream = new ZipInputStream(in);
                        ZipEntry entry;
                        //解壓后有多個文件一並解壓出來並實現合並
                        //合並后的地址
                        FSDataOutputStream mergerout = hdfs.create(new Path(hdfsFile + File.separator +
                                inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf("."))));
                        while((entry = zipInputStream.getNextEntry()) != null){
                            int bygeSize1=2*1024*1024;
                            byte[] buffer1 = new byte[bygeSize1];
                            int nNumber;
                            while((nNumber = zipInputStream.read(buffer1,0, bygeSize1)) != -1){
                                mergerout.write(buffer1, 0, nNumber);
                            }
                        }
                        
                        mergerout.flush();
                        mergerout.close();
                        zipInputStream.close();
                    }catch(IOException e){
                        continue;
                    }
                    in.close();
                    /**
                     *將解壓合並后的數據壓縮成gzip格式
                     */
                    GZIPOutputStream gzipOutputStream = null;
                    try{
                        FSDataOutputStream outputStream = null;
                        outputStream = hdfs.create(new Path(hdfsFile + File.separator +
                                inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf(".")) + ".gz"));
                        FSDataInputStream inputStream = null;
                        gzipOutputStream = new GZIPOutputStream(outputStream);
                        inputStream = hdfs.open(new Path(hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf("."))));
                        int bygeSize=2*1024*1024;
                        byte[] buffer = new byte[bygeSize];
                        int len;
                        while((len = inputStream.read(buffer)) > 0){
                            gzipOutputStream.write(buffer, 0, len);
                        }
                        inputStream.close();
                        gzipOutputStream.finish();
                        gzipOutputStream.flush();
                        outputStream.close();
                    }catch (Exception exception){
                        exception.printStackTrace();
                    }
                    gzipOutputStream.close();
                    //刪除zip文件解壓合並后的臨時文件
                    String tempfiles = hdfsFile + File.separator + inputFiles[i].getPath().getName().substring(0, inputFiles[i].getPath().getName().indexOf("."));
                    try{
                        if(hdfs.exists(new Path(tempfiles))){
                            hdfs.delete(new Path(tempfiles), true);
                        }
                    }catch(IOException ie){
                        ie.printStackTrace();
                    }
                }
            }
        }catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void errorMessage(String str) {
        System.out.println("Error Message: " + str);
        System.exit(1);
    }

}

調用:

[c@v09823]# hadoop jar myjar.jar [ConvertHdfsZipFileToGzipFile該main的類名根據打包方式決定是否需要] /zip/(待轉換文件路徑,在HDFS上) /user/j/pconline/(轉換完成后的文件存儲地址,也在HDFS上) pconline(待轉換的文件名包含的字符) 
如果要實現遞歸的話,可以在filesmerge后面加上 -r  

 執行過程中快照:

[c@v09823 ~]$ hadoop fs -ls /user/c/df/myzip
17/10/12 20:26:27 INFO hdfs.PeerCache: SocketCache disabled.
Found 30 items
-rw-r--r--+  3 c hadoop 2358927121 2017-10-11 21:57 user/c/df/myzip/myzip_0.zip
-rw-r--r--+  3 c hadoop 2361573235 2017-10-11 19:33 user/c/df/myzip/myzip_12.zip
-rw-r--r--+  3 c hadoop 2359169853 2017-10-11 19:34 user/c/df/myzip/myzip_15.zip
...


[c@v09823 ~]$ yarn jar My_ConvertHdfsZipFileToGzipFile.jar /user/c/df/myzip user/c/df/mygzip .zip
17/10/13 00:00:36 INFO hdfs.PeerCache: SocketCache disabled.
myzip_0.zip
myzip_12.zip
myzip_15.zip
...


[catt@vq20skjh01 ~]$ hadoop fs -ls -h user/c/df/mygzip
17/10/13 00:44:45 INFO hdfs.PeerCache: SocketCache disabled.
Found 3 items
-rw-r--r--+  3 c hadoop      14.9 G 2017-10-13 00:15 user/c/df/mygzip/myzip_0
-rw-r--r--+  3 c hadoop      14.9 G 2017-10-13 00:35 user/c/df/mygzip/myzip_12
-rw-r--r--+  3 c hadoop         6 G 2017-10-13 00:35 user/c/df/mygzip/myzip_15
....

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM