Hadoop MapReduce中壓縮技術的使用


Compression and Input Splits
 
當我們使用壓縮數據作為MapReduce的輸入時,需要確認數據的壓縮格式是否支持切片?
 
假設HDFS中有一個未經壓縮的大小為1GB的文本文件,如果HDFS Block大小為128MB,那么這個文件會被HDFS存儲為8個Block。當MapReduce Job使用這個文件作為輸入時將會創建8個切片(默認每一個Block生成一個切片),每一個切片關聯的數據都可以被一個Map Task獨立地處理。
 
如果這個文本文件使用Gzip格式壓縮,大小仍為1GB,如前所述,它也會被HDFS存儲為8個Block。可是當MapReduce Job使用這個文件作為輸入時,為每一個Block生成一個切片是不可行的,取而代之的是整個文件將作為一個切片被一個Map Task所處理。
 
Gzip使用DEFLATE存儲壓縮數據,DEFLATE將數據存儲為一系列的壓縮數據塊,可是這些壓縮數據塊的邊界是無法區分的,導致在數據流中無法定位某個數據塊的起始位置。也就是說,我們無法隨意地指定一個位置(該位置不一定恰好是某數據塊的起始位置),然后移動到下一個數據塊的起始位置讀取數據。基本這個原因,Gzip格式的文件是不支持切片的。
 
對於這種情況,MapReduce是可以作出正確處理的,通過文件后綴名(文件后綴名直接影響壓縮數據格式的判斷)可以判斷出這個文件是以Gzip格式進行壓縮的,不支持切片,會將整個文件作為一個切片進行處理。但這樣做是有很大代價的,一個Map Task要處理整個文件的數據,而且大部分數據並不是“數據本地性”的。
 
如果這個文本文件使用LZO格式壓縮,同樣的問題也會存在,但是Hadoop LZO Library提供了一個用於預處理LZO文件的切片索引工具,可以簡單地認為生成的索引文件中保存着各個切片的起始位置,再配合合適的InputFormat(如:LzoTextInputFormat),運行MapReduce Job時就可以支持切片。
 
Bzip2的各個數據塊之間存放有專門的“Synchronization Marker”,因此它是可以支持切片的。
 
Hadoop通常處理的都是大規模的數據集,因此我們必須盡可能的利用壓縮優化性能。具體使用哪一個壓縮格式依賴於文件大小、文件格式以及我們使用的分析工具。以下是一些使用建議:
 
(1)使用一些容器文件格式,如Sequence File、Avro DataFile、ORCFile、Parquet File,這些文件格式全部支持壓縮和切片,配合一個快速的壓縮算法(如:LZO、LZ4、Snappy)使用通常是一個好的選擇;
 
(2)使用一個支持切片的壓縮算法,如bzip2、lzo(通過索引處理之后可以支持切片);
 
(3)將一個文件人為地切分為Chunk(即一個文件被切分為多個文件),然后將這些Chunks逐個的進行壓縮,可以使用任意支持的壓縮算法,且不需要考慮壓縮算法是否支持切片,但是Chunk壓縮后的大小需要接近於HDFS Block的大小;
 
(4)文件不作壓縮處理。
 
對於大型的文件,我們不能選擇不支持切片的壓縮算法,因為這會導致MapReduce Job喪失數據本地性且運行效率低下。
 
Using Compression in MapReduce
 
MapReduce讀取輸入路徑中的壓縮文件時會自動完成數據解壓(可參考CompressionCodecFactory)。
 
如果MapReduce Job的結果輸出需要使用壓縮,可以通過設置Job的相關配置屬性來實現:
 
mapreduce.output.fileoutputformat.compress:true
 
mapreduce.output.fileoutputformat.compress.codec:CompressionCodec全限定類名
 
也可以通過FileOutputFormat提供的靜態方法設置,如:
 
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
 
不同的輸出文件格式可能相應的設置屬性會有不同。
 
Compressing map output
 
Map Task的輸出被寫出到本地磁盤,而且需要通過網絡傳輸至Reduce Task的節點,只要簡單地使用一個快速的壓縮算法(如LZO、LZ4、Snappy)就可以帶來性能的提升,因為壓縮機制的使用避免了Map Tasks與Reduce Tasks之間大量中間結果數據被傳輸。可以通過設置相應的Job配置屬性開啟:
 
mapreduce.map.output.compress:true
 
mapreduce.map.output.compress.codec:CompressionCodec全限定類名
 
也可以通過Configuration API進行設置:
 
new API:
 
Configuration conf = new Configuration();
conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
Job job = new Job(conf);
 
old API:
 
conf.setCompressMapOutput(true);
conf.setMapOutputCompressorClass(GzipCodec.class);
 
 
 
 
 
 
 
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM