Hadoop權威指南:壓縮


Hadoop權威指南:壓縮

文件壓縮的兩個好處:

  • 減少儲存文件所需要的磁盤空間
  • 加速數據在網絡和磁盤上的傳輸

壓縮格式總結:

壓縮格式 工具 算法 文件擴展名 是否可切分
DEFLATE DEFLATE .deflate
Gzip gzip DEFLATE .gz
bzip2 bzip2 bzip2 .bz2
LZO lzop LZO .lzo
LZ4 LZ4 .lz4
Snappy Snappy .snapp

上述表中的所有壓縮工具都提供9個不同的選項來控制壓縮時必須考慮的權衡:

  • -1位優化壓縮速度
  • -9為優化壓縮空間

codec

codec 實現了一種壓縮-解壓算法

Hadoop中一個對CompressionCodec接口的實現代表一個codec

Hadoop的壓縮codec

壓縮格式 HadoopCompressionCodec
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
LZ4 org.apache.hadoop.io.compress.Lz4Codec
Snappy org.apache.hadoop.io.compress.SnappyCodec

通過CompressionCodec對數據流進行壓縮和解壓縮

CompressionCodec包含兩個函數 可以輕松用於壓縮和解壓縮數據

  • createOutputStream(OutputStream out)方法在底層數據流中對需要以壓縮格式寫入在此之前尚未壓縮的數據新建一個CompressionOutputStream對象
  • 對輸入數據流中讀取的數據進行解壓縮的時候,則調用createInputStream(InputStream in)獲取CompressionInputStream,通過該方法從底層數據里讀取解壓縮后的數據

壓縮從標准輸入讀取的數據,並寫到標准輸出

代碼

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;

public class StreamCompressor {
    public static void main(String  args[]) throws ClassNotFoundException, IOException {
        // 符合ReflectionUtils的實現
        String codecClassname = args[0];
        Class<?> codecClass = Class.forName(codecClassname);
        Configuration conf = new Configuration();
        // 新建codec實例
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);

        // 獲得在System.out上支持壓縮的一個包裹方法
        CompressionOutputStream out = codec.createOutputStream(System.out);
        // 對IOUtils對象調用copyBytes()方法將輸入數據復制到輸出
        // 輸出由CompressionOutputStream對象壓縮
        IOUtils.copyBytes(System.in, out, 4096, false);
        // 要求壓縮方法完成到壓縮數據流的寫操作
        out.finish();
    }
}

// echo "Test" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec | gunzip

編譯

javac StreamCompressor.java

測試

壓縮從標准輸入讀取的數據,並寫到標准輸出,通過管道傳遞給gunzip, 顯示壓縮內容

echo "Test" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec | gunzip

通過CompressionCodecFactory推斷CompressionCodec

根據文件擴展名選取codec解壓縮文件

代碼

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;

public class FileDecompressor {
    public static void main(String args[]) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(uri), conf);

        Path inputPath = new Path(uri);
        CompressionCodecFactory factory = new CompressionCodecFactory(conf);
        CompressionCodec codec = factory.getCodec(inputPath);
        if (codec == null) {
            System.err.println("No codec found for " + uri);
            System.exit(1);
        }

        String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());

        InputStream in = null;
        OutputStream out = null;
        try {
            in = codec.createInputStream(fs.open(inputPath));
            out = fs.create(new Path(outputUri));
            IOUtils.copyBytes(in, out, conf);
        } finally {
            IOUtils.closeStream(in);
            IOUtils.closeStream(out);
        }
    }
}
  • 通過使用CompressionCodecFactorygetCodec()方法, 得到相應的CompressionCodec或者nullCompressionCodecFactory可通過搜索注冊的codec 找到匹配指定文件擴展名的 codec
  • 如果壓縮的是單個文本文件, 可以直接使用cat名查看解壓縮后生成的文件

編譯

javac FileDecompressor.java

運行

hadoop FileDecompressor /usr/hadoop/file.gz

  • file.gz 要在hdfs中存在
  • 會被解壓在hdfs中名為file的文件

壓縮代碼庫的實現

  • 使用原生類庫來實現解壓縮, 會節約解壓縮的時間.
  • 可以通過Java系統的java.library.path屬性指定原生代碼庫
壓縮格式 是否有Java實現 是否有原生(native)實現
DEFLATE
gzip
bzip2
LZO
LZ4
Snappy

CodecPool

  • CodecPool對象支持反復使用壓縮和解壓縮操作, 以減少創建對象的開銷

使用壓縮池對讀取自標准輸入的數據進行壓縮,然后將其寫到標准輸出

代碼

/*
* Hadoop權威指南:第四章
* */

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.oncrpc.security.SysSecurityHandler;
import org.apache.hadoop.util.ReflectionUtils;

import java.io.IOException;

public class PooledStreamCompressor {
    public static void main(String args[]) throws ClassNotFoundException, IOException {
        String codecClassName = args[0];
        // 找到對應CompressionCodec類
        Class<?> codeClass = Class.forName(codecClassName);
        Configuration conf = new Configuration();
        // 通過類名創建實例對象
        CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codeClass, conf);
        Compressor compressor = null;

        try {
            // 得到compressor
            compressor = CodecPool.getCompressor(codec);
            CompressionOutputStream out = codec.createOutputStream(System.out, compressor);
            IOUtils.copyBytes(System.in, out, 4096, false);
            out.flush();
        } finally {
            // 將compressor返回CodecPool中
            CodecPool.returnCompressor(compressor);
        }
    }
}
  • codec的重載方法createOutputStream中,對於制定的CompressionCodec,我們從池中獲取一個Compressor實例

壓縮和輸入分片

  • 了解這些壓縮格式是否支持切分非常重要,例如, 文件壓縮為某種個格式后,為1G, 它將被儲存成16塊(每塊是64M的話), 如果該壓縮格式不支持切分, 那么就每個塊就不能單獨作為輸入文件,無法實現從壓縮數據流任意位置讀取數據

應該使用哪種壓縮格式?

大致按照效率從高到低排列:

  • 適應容器文件格式,例如順序文件,REFile或者Avro數據問津, 這些格式同時支持壓縮和切分.通常最好與一個快速壓縮工具聯合使用,例如:LZO,LZ4或者Snappy
  • 使用支持切分的壓縮格式, 例如bzip2, 或者使用通過索引實現切分的壓縮格式, 例如:LZO
  • 在應用中將文件切分成塊,並使用任意一種壓縮格式為每個數據塊建立壓縮文件(不論它是否支持切分).這時, 需要合理選擇數據塊大小, 以確保壓縮后數據塊的大小近似於HDFS塊的大小
  • 儲存未經壓縮的文件

對於大文件來說,不要使用不支持切分整個文件的壓縮格式,因為會失去數據的本地特性,進而造成MapReduce應用效率低下

在MapReduce中使用壓縮

解壓縮輸入

  • 如果輸入文件是壓縮的,那么在根據文件擴展名推斷出相應的codec后,MapReduce會在讀取文件是自動解壓縮文件

解壓縮輸出

  • 要想壓縮MapReduce作業的輸出,應在作業配置過程中將mapred.output.compress設為truemapred.output.compression.codec屬性設置為打算使用的壓縮codec的類名
  • 要想壓縮MapReduce作業的輸出,還可以在FileOutputFormat中使用更便捷的方法設置這些屬性

對查找最高溫作業所產生輸出進行壓縮

代碼

/*
* Hadoop權威指南:第四章
* */

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class MaxTemperatureWithCompression {
    public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {
        if (args.length != 2) {
            System.err.println("Usage: MaxTemperatureWithCompression <input path> <output path>");
            System.exit(-1);
        }

        Job job = new Job();
        job.setJarByClass(MaxTempreture.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileOutputFormat.setCompressOutput(job, true);
        FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

運行

hadoop MaxTemperatureWithCompression input/sample.txt.gz output

最終輸出的每個部分都是經過壓縮的

查看輸出

gunzip -c output/part-c-00000.gz

為輸出生成順序文件(sequence file)

  • 可以設置mapred.output.compression.type屬性來控制限制使用壓縮格式,默認值為RECORD, 即對每一條記錄進行壓縮, 如果將其改為BLOCK,將針對一組記錄進行壓縮
  • SequenceFileOutputFormat類中還有一個靜態方法putCompressionType()可用來便捷地設置該屬性
MapReduce壓縮格式

下表歸納概述了用於設置MapReduce作業輸出的壓縮格式的配置屬性

屬性名稱 類型 默認值 描述
mapred.output.compress boolean false 壓縮輸出
mapred.output.compression.codec 類名稱 org.apache.hadoop.io.compress.DefaultCodec map輸出所用的壓縮codec
mapred.output.compression.type String RECORD SqeuenceFile的輸出可以使用的壓縮類型:NONE,RECORD,BLOCK

對map任務輸出進行壓縮

map任務的輸出需要寫到磁盤並通過網絡傳輸到reducer節點,所以如果使用LZO,LZ4或者Snappy這樣的快速壓縮方式,是可以獲取性能提升的

map任務輸出的壓縮屬性

屬性名稱 類型 默認值 描述
mapred.compress.map.output boolean false 對map任務輸出進行壓縮
mapred.map.output.compression.codec Class org.apache.hadoop.io.compress.DefaultCodec map輸出所有的壓縮codec

示例代碼

Configuration conf = new Configuration();
conf.setBoolean("mapred.compress.map.output", true);
conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
Job job = new Job(conf);


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM