Hadoop權威指南:壓縮
目錄
文件壓縮的兩個好處:
- 減少儲存文件所需要的磁盤空間
- 加速數據在網絡和磁盤上的傳輸
壓縮格式總結:
壓縮格式 | 工具 | 算法 | 文件擴展名 | 是否可切分 |
---|---|---|---|---|
DEFLATE | 無 | DEFLATE | .deflate | 否 |
Gzip | gzip | DEFLATE | .gz | 否 |
bzip2 | bzip2 | bzip2 | .bz2 | 是 |
LZO | lzop | LZO | .lzo | 否 |
LZ4 | 無 | LZ4 | .lz4 | 否 |
Snappy | 無 | Snappy | .snapp | 否 |
上述表中的所有壓縮工具都提供9個不同的選項來控制壓縮時必須考慮的權衡:
- -1位優化壓縮速度
- -9為優化壓縮空間
codec
codec
實現了一種壓縮-解壓算法
Hadoop中一個對CompressionCodec
接口的實現代表一個codec
Hadoop的壓縮codec
壓縮格式 | HadoopCompressionCodec |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
LZ4 | org.apache.hadoop.io.compress.Lz4Codec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
通過CompressionCodec對數據流進行壓縮和解壓縮
CompressionCodec包含兩個函數
可以輕松用於壓縮和解壓縮數據
createOutputStream(OutputStream out)
方法在底層數據流中對需要以壓縮格式寫入在此之前尚未壓縮的數據新建一個CompressionOutputStream
對象- 對輸入數據流中讀取的數據進行解壓縮的時候,則調用
createInputStream(InputStream in)
獲取CompressionInputStream
,通過該方法從底層數據里讀取解壓縮后的數據
壓縮從標准輸入讀取的數據,並寫到標准輸出
代碼
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
public class StreamCompressor {
public static void main(String args[]) throws ClassNotFoundException, IOException {
// 符合ReflectionUtils的實現
String codecClassname = args[0];
Class<?> codecClass = Class.forName(codecClassname);
Configuration conf = new Configuration();
// 新建codec實例
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, conf);
// 獲得在System.out上支持壓縮的一個包裹方法
CompressionOutputStream out = codec.createOutputStream(System.out);
// 對IOUtils對象調用copyBytes()方法將輸入數據復制到輸出
// 輸出由CompressionOutputStream對象壓縮
IOUtils.copyBytes(System.in, out, 4096, false);
// 要求壓縮方法完成到壓縮數據流的寫操作
out.finish();
}
}
// echo "Test" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec | gunzip
編譯
javac StreamCompressor.java
測試
壓縮從標准輸入讀取的數據,並寫到標准輸出,通過管道傳遞給gunzip, 顯示壓縮內容
echo "Test" | hadoop StreamCompressor org.apache.hadoop.io.compress.GzipCodec | gunzip
通過CompressionCodecFactory推斷CompressionCodec
根據文件擴展名選取codec解壓縮文件
代碼
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
public class FileDecompressor {
public static void main(String args[]) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path inputPath = new Path(uri);
CompressionCodecFactory factory = new CompressionCodecFactory(conf);
CompressionCodec codec = factory.getCodec(inputPath);
if (codec == null) {
System.err.println("No codec found for " + uri);
System.exit(1);
}
String outputUri = CompressionCodecFactory.removeSuffix(uri, codec.getDefaultExtension());
InputStream in = null;
OutputStream out = null;
try {
in = codec.createInputStream(fs.open(inputPath));
out = fs.create(new Path(outputUri));
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.closeStream(in);
IOUtils.closeStream(out);
}
}
}
- 通過使用
CompressionCodecFactory
的getCodec()
方法, 得到相應的CompressionCodec
或者null
即CompressionCodecFactory
可通過搜索注冊的codec
找到匹配指定文件擴展名的codec
- 如果壓縮的是單個文本文件, 可以直接使用
cat
名查看解壓縮后生成的文件
編譯
javac FileDecompressor.java
運行
hadoop FileDecompressor /usr/hadoop/file.gz
file.gz
要在hdfs中存在- 會被解壓在hdfs中名為
file
的文件
壓縮代碼庫的實現
- 使用原生類庫來實現解壓縮, 會節約解壓縮的時間.
- 可以通過Java系統的
java.library.path
屬性指定原生代碼庫
壓縮格式 | 是否有Java實現 | 是否有原生(native)實現 |
---|---|---|
DEFLATE | 是 | 是 |
gzip | 是 | 是 |
bzip2 | 是 | 否 |
LZO | 否 | 是 |
LZ4 | 否 | 是 |
Snappy | 否 | 是 |
CodecPool
CodecPool
對象支持反復使用壓縮和解壓縮操作, 以減少創建對象的開銷
使用壓縮池對讀取自標准輸入的數據進行壓縮,然后將其寫到標准輸出
代碼
/*
* Hadoop權威指南:第四章
* */
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.Compressor;
import org.apache.hadoop.oncrpc.security.SysSecurityHandler;
import org.apache.hadoop.util.ReflectionUtils;
import java.io.IOException;
public class PooledStreamCompressor {
public static void main(String args[]) throws ClassNotFoundException, IOException {
String codecClassName = args[0];
// 找到對應CompressionCodec類
Class<?> codeClass = Class.forName(codecClassName);
Configuration conf = new Configuration();
// 通過類名創建實例對象
CompressionCodec codec = (CompressionCodec) ReflectionUtils.newInstance(codeClass, conf);
Compressor compressor = null;
try {
// 得到compressor
compressor = CodecPool.getCompressor(codec);
CompressionOutputStream out = codec.createOutputStream(System.out, compressor);
IOUtils.copyBytes(System.in, out, 4096, false);
out.flush();
} finally {
// 將compressor返回CodecPool中
CodecPool.returnCompressor(compressor);
}
}
}
- 在
codec
的重載方法createOutputStream
中,對於制定的CompressionCodec
,我們從池中獲取一個Compressor
實例
壓縮和輸入分片
- 了解這些壓縮格式是否支持切分非常重要,例如, 文件壓縮為某種個格式后,為1G, 它將被儲存成16塊(每塊是64M的話), 如果該壓縮格式不支持切分, 那么就每個塊就不能單獨作為輸入文件,無法實現從壓縮數據流任意位置讀取數據
應該使用哪種壓縮格式?
大致按照效率從高到低排列:
- 適應容器文件格式,例如順序文件,REFile或者Avro數據問津, 這些格式同時支持壓縮和切分.通常最好與一個快速壓縮工具聯合使用,例如:LZO,LZ4或者Snappy
- 使用支持切分的壓縮格式, 例如bzip2, 或者使用通過索引實現切分的壓縮格式, 例如:LZO
- 在應用中將文件切分成塊,並使用任意一種壓縮格式為每個數據塊建立壓縮文件(不論它是否支持切分).這時, 需要合理選擇數據塊大小, 以確保壓縮后數據塊的大小近似於HDFS塊的大小
- 儲存未經壓縮的文件
對於大文件來說,不要使用不支持切分整個文件的壓縮格式,因為會失去數據的本地特性,進而造成MapReduce應用效率低下
在MapReduce中使用壓縮
解壓縮輸入
- 如果輸入文件是壓縮的,那么在根據文件擴展名推斷出相應的
codec
后,MapReduce會在讀取文件是自動解壓縮文件
解壓縮輸出
- 要想壓縮MapReduce作業的輸出,應在作業配置過程中將
mapred.output.compress
設為true
和mapred.output.compression.codec
屬性設置為打算使用的壓縮codec的類名 - 要想壓縮MapReduce作業的輸出,還可以在
FileOutputFormat
中使用更便捷的方法設置這些屬性
對查找最高溫作業所產生輸出進行壓縮
代碼
/*
* Hadoop權威指南:第四章
* */
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MaxTemperatureWithCompression {
public static void main(String args[]) throws IOException, ClassNotFoundException, InterruptedException {
if (args.length != 2) {
System.err.println("Usage: MaxTemperatureWithCompression <input path> <output path>");
System.exit(-1);
}
Job job = new Job();
job.setJarByClass(MaxTempreture.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
運行
hadoop MaxTemperatureWithCompression input/sample.txt.gz output
最終輸出的每個部分都是經過壓縮的
查看輸出
gunzip -c output/part-c-00000.gz
為輸出生成順序文件(sequence file)
- 可以設置
mapred.output.compression.type
屬性來控制限制使用壓縮格式,默認值為RECORD
, 即對每一條記錄進行壓縮, 如果將其改為BLOCK
,將針對一組記錄進行壓縮 - 在
SequenceFileOutputFormat
類中還有一個靜態方法putCompressionType()
可用來便捷地設置該屬性
MapReduce壓縮格式
下表歸納概述了用於設置MapReduce作業輸出的壓縮格式的配置屬性
屬性名稱 | 類型 | 默認值 | 描述 |
---|---|---|---|
mapred.output.compress | boolean | false | 壓縮輸出 |
mapred.output.compression.codec | 類名稱 | org.apache.hadoop.io.compress.DefaultCodec | map輸出所用的壓縮codec |
mapred.output.compression.type | String | RECORD | SqeuenceFile的輸出可以使用的壓縮類型:NONE,RECORD,BLOCK |
對map任務輸出進行壓縮
map任務的輸出需要寫到磁盤並通過網絡傳輸到reducer節點,所以如果使用LZO,LZ4或者Snappy這樣的快速壓縮方式,是可以獲取性能提升的
map任務輸出的壓縮屬性
屬性名稱 | 類型 | 默認值 | 描述 |
---|---|---|---|
mapred.compress.map.output | boolean | false | 對map任務輸出進行壓縮 |
mapred.map.output.compression.codec | Class | org.apache.hadoop.io.compress.DefaultCodec | map輸出所有的壓縮codec |
示例代碼
Configuration conf = new Configuration();
conf.setBoolean("mapred.compress.map.output", true);
conf.setClass("mapred.map.output.compression.codec", GzipCodec.class, CompressionCodec.class);
Job job = new Job(conf);