一 Hadoop數據壓縮
1.1 概述
壓縮技術能夠有效減少底層存儲系統(HDFS)讀寫字節數。壓縮提高了網絡帶寬和磁盤空間的效率。在Hadood下,尤其是數據規模很大和工作負載密集的情況下,使用數據壓縮顯得非常重要。在這種情況下,I/O操作和網絡數據傳輸要花大量的時間。還有,Shuffle與Merge過程同樣也面臨着巨大的I/O壓力。
鑒於磁盤I/O和網絡帶寬是Hadoop的寶貴資源,數據壓縮對於節省資源、最小化磁盤I/O和網絡傳輸非常有幫助。不過,盡管壓縮與解壓操作的CPU開銷不高,其性能的提升和資源的節省並非沒有代價。
如果磁盤I/O和網絡帶寬影響了MapReduce作業性能,在任意MapReduce階段啟用壓縮都可以改善端到端處理時間並減少I/O和網絡流量。
壓縮mapreduce的一種優化策略:通過壓縮編碼對mapper或者reducer的輸出進行壓縮,以減少磁盤IO,提高MR程序運行速度(但相應增加了cpu運算負擔)。
注意:壓縮特性運用得當能提高性能,但運用不當也可能降低性能。
基本原則:
(1)運算密集型的job,少用壓縮
(2)IO密集型的job,多用壓縮
1.2 MR支持的壓縮編碼
壓縮格式 |
hadoop自帶? |
算法 |
文件擴展名 |
是否可切分 |
換成壓縮格式后,原來的程序是否需要修改 |
DEFAULT |
是,直接使用 |
DEFAULT |
.deflate |
否 |
和文本處理一樣,不需要修改 |
Gzip |
是,直接使用 |
DEFAULT |
.gz |
否 |
和文本處理一樣,不需要修改 |
bzip2 |
是,直接使用 |
bzip2 |
.bz2 |
是 |
和文本處理一樣,不需要修改 |
LZO |
否,需要安裝 |
LZO |
.lzo |
是 |
需要建索引,還需要指定輸入格式 |
Snappy |
否,需要安裝 |
Snappy |
.snappy |
否 |
和文本處理一樣,不需要修改 |
為了支持多種壓縮/解壓縮算法,Hadoop引入了編碼/解碼器,如下表所示
壓縮格式 |
對應的編碼/解碼器 |
DEFLATE |
org.apache.hadoop.io.compress.DefaultCodec |
gzip |
org.apache.hadoop.io.compress.GzipCodec |
bzip2 |
org.apache.hadoop.io.compress.BZip2Codec |
LZO |
com.hadoop.compression.lzo.LzopCodec |
Snappy |
org.apache.hadoop.io.compress.SnappyCodec |
壓縮性能的比較
壓縮算法 |
原始文件大小 |
壓縮文件大小 |
壓縮速度 |
解壓速度 |
gzip |
8.3GB |
1.8GB |
17.5MB/s |
58MB/s |
bzip2 |
8.3GB |
1.1GB |
2.4MB/s |
9.5MB/s |
LZO |
8.3GB |
2.9GB |
49.3MB/s |
74.6MB/s |
http://google.github.io/snappy/
On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.
1.3 各種壓縮方式詳解
1.3.1 Gzip壓縮
優點:壓縮率比較高,而且壓縮/解壓速度也比較快;hadoop本身支持,在應用中處理gzip格式的文件就和直接處理文本一樣;大部分linux系統都自帶gzip命令,使用方便。
缺點:不支持split。
應用場景:當每個文件壓縮之后在130M以內的(1個塊大小內),都可以考慮用gzip壓縮格式。譬如說一天或者一個小時的日志壓縮成一個gzip文件,運行mapreduce程序的時候通過多個gzip文件達到並發。hive程序,streaming程序,和java寫的mapreduce程序完全和文本處理一樣,壓縮之后原來的程序不需要做任何修改。
1.3.2 lzo壓縮
優點:壓縮/解壓速度也比較快,合理的壓縮率;支持split,是hadoop中最流行的壓縮格式;可以在linux系統下安裝lzop命令,使用方便。
缺點:壓縮率比gzip要低一些;hadoop本身不支持,需要安裝;在應用中對lzo格式的文件需要做一些特殊處理(為了支持split需要建索引,還需要指定inputformat為lzo格式)。
應用場景:一個很大的文本文件,壓縮之后還大於200M以上的可以考慮,而且單個文件越大,lzo優點越越明顯。
1.3.3 snappy壓縮
優點:高速壓縮速度和合理的壓縮率。
缺點:不支持split;壓縮率比gzip要低;hadoop本身不支持,需要安裝;
應用場景:當mapreduce作業的map輸出的數據比較大的時候,作為map到reduce的中間數據的壓縮格式;或者作為一個mapreduce作業的輸出和另外一個mapreduce作業的輸入。
1.3.4 bzip2壓縮
優點:支持split;具有很高的壓縮率,比gzip壓縮率都高;hadoop本身支持,但不支持native;在linux系統下自帶bzip2命令,使用方便。
缺點:壓縮/解壓速度慢;不支持native。
應用場景:適合對速度要求不高,但需要較高的壓縮率的時候,可以作為mapreduce作業的輸出格式;或者輸出之后的數據比較大,處理之后的數據需要壓縮存檔減少磁盤空間並且以后數據用得比較少的情況;或者對單個很大的文本文件想壓縮減少存儲空間,同時又需要支持split,而且兼容之前的應用程序(即應用程序不需要修改)的情況。
1.3.5 如何選擇壓縮格式?
Hadoop應用處理的數據集非常大,因此需要借助於壓縮。使用哪種壓縮格式與待處理的文件的大小、格式和所使用的工具相關。下面我們給出了一些建議,大致是按照效率從高到低排序的。
1)使用容器文件格式,例如順序文件、RCFile或者Avro 數據文件,所有這些文件格式同時支持壓縮和切分。通常最好與一個快速壓縮工具聯合使用,例如LZO,LZ4或者 Snappy。
2)使用支持切分的壓縮格式,例如bzip2(盡管bzip2 非常慢),或者使用通過索引實現切分的壓縮格式,例如LZO。
3)在應用中將文件切分成塊,並使用任意一種壓縮格式為每個數據塊建立壓縮文件(不論它是否支持切分)。這種情況下,需要合理選擇數據塊的大小,以確保壓縮后數據塊的大小近似與HDFS塊的大小。
4)存儲未經壓縮的文件。
對大文件來說,不要使用不支持切分整個文件的壓縮格式,因為會失去數據的本地特性,進而造成MapReduce應用效率低下。
1.4 采用壓縮的位置
壓縮可以在MapReduce作用的任意階段啟用。
1)輸入壓縮:
在有大量數據並計划重復處理的情況下,應該考慮對輸入進行壓縮。然而,你無須顯示指定使用的編解碼方式。Hadoop自動檢查文件擴展名,如果擴展名能夠匹配,就會用恰當的編解碼方式對文件進行壓縮和解壓。否則,Hadoop就不會使用任何編解碼器。
2)壓縮mapper輸出:
當map任務輸出的中間數據量很大時,應考慮在此階段采用壓縮技術。這能顯著改善內部數據Shuffle過程,而Shuffle過程在Hadoop處理過程中是資源消耗最多的環節。如果發現數據量大造成網絡傳輸緩慢,應該考慮使用壓縮技術。可用於壓縮mapper輸出的快速編解碼器包括LZO或者Snappy。
注:LZO是供Hadoop壓縮數據用的通用壓縮編解碼器。其設計目標是達到與硬盤讀取速度相當的壓縮速度,因此速度是優先考慮的因素,而不是壓縮率。與gzip編解碼器相比,它的壓縮速度是gzip的5倍,而解壓速度是gzip的2倍。同一個文件用LZO壓縮后比用gzip壓縮后大50%,但比壓縮前小25%~50%。這對改善性能非常有利,map階段完成時間快4倍。
3)壓縮reducer輸出:
在此階段啟用壓縮技術能夠減少要存儲的數據量,因此降低所需的磁盤空間。當mapreduce作業形成作業鏈條時,因為第二個作業的輸入也已壓縮,所以啟用壓縮同樣有效。
1.5 壓縮配置參數
要在Hadoop中啟用壓縮,可以配置如下參數(mapred-site.xml文件中):
參數 |
默認值 |
階段 |
建議 |
io.compression.codecs (在core-site.xml中配置) |
org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.Lz4Codec |
輸入壓縮 |
Hadoop使用文件擴展名判斷是否支持某種編解碼器 |
mapreduce.map.output.compress |
false |
mapper輸出 |
這個參數設為true啟用壓縮 |
mapreduce.map.output.compress.codec |
org.apache.hadoop.io.compress.DefaultCodec |
mapper輸出 |
使用LZO、LZ4或snappy編解碼器在此階段壓縮數據 |
mapreduce.output.fileoutputformat.compress |
false |
reducer輸出 |
這個參數設為true啟用壓縮 |
mapreduce.output.fileoutputformat.compress.codec |
org.apache.hadoop.io.compress. DefaultCodec |
reducer輸出 |
使用標准工具或者編解碼器,如gzip和bzip2 |
mapreduce.output.fileoutputformat.compress.type |
RECORD |
reducer輸出 |
SequenceFile輸出使用的壓縮類型:NONE和BLOCK |
1.6 壓縮實戰
壓縮案例詳見壓縮/解壓縮。
二 Hadoop企業優化
2.1 MapReduce跑慢的原因
Mapreduce 程序效率的瓶頸在於兩點:
1)計算機性能
CPU、內存、磁盤健康、網絡
2)I/O 操作優化
(1)數據傾斜
(2)map和reduce數設置不合理
(3)reduce等待過久
(4)小文件過多
(5)大量的不可分塊的超大文件
(6)spill次數過多
(7)merge次數過多等。
2.2 MapReduce優化方法
MapReduce優化方法主要從以下六個方面考慮:
2.2.1 數據輸入
(1)合並小文件:在執行mr任務前將小文件進行合並,大量的小文件會產生大量的map任務,增大map任務裝載次數,而任務的裝載比較耗時,從而導致 mr 運行較慢。
(2)采用ConbinFileInputFormat來作為輸入,解決輸入端大量小文件場景。
2.2.2 Map階段
1)減少spill次數:通過調整io.sort.mb及sort.spill.percent參數值,增大觸發spill的內存上限,減少spill次數,從而減少磁盤 IO。
2)減少merge次數:通過調整io.sort.factor參數,增大merge的文件數目,減少merge的次數,從而縮短mr處理時間。
3)在 map 之后先進行combine處理,減少 I/O。
2.2.3 Reduce階段
1)合理設置map和reduce數:兩個都不能設置太少,也不能設置太多。太少,會導致task等待,延長處理時間;太多,會導致 map、reduce任務間競爭資源,造成處理超時等錯誤。
2)設置map、reduce共存:調整slowstart.completedmaps參數,使map運行到一定程度后,reduce也開始運行,減少reduce的等待時間。
3)規避使用reduce,因為Reduce在用於連接數據集的時候將會產生大量的網絡消耗。
4)合理設置reduc端的buffer,默認情況下,數據達到一個閾值的時候,buffer中的數據就會寫入磁盤,然后reduce會從磁盤中獲得所有的數據。也就是說,buffer和reduce是沒有直接關聯的,中間多個一個寫磁盤->讀磁盤的過程,既然有這個弊端,那么就可以通過參數來配置,使得buffer中的一部分數據可以直接輸送到reduce,從而減少IO開銷:mapred.job.reduce.input.buffer.percent,默認為0.0。當值大於0的時候,會保留指定比例的內存讀buffer中的數據直接拿給reduce使用。這樣一來,設置buffer需要內存,讀取數據需要內存,reduce計算也要內存,所以要根據作業的運行情況進行調整。
2.2.4 IO傳輸
1)采用數據壓縮的方式,減少網絡IO的的時間。安裝Snappy和LZOP壓縮編碼器。
2)使用SequenceFile二進制文件
2.2.5 數據傾斜問題
1)數據傾斜現象
數據頻率傾斜——某一個區域的數據量要遠遠大於其他區域。
數據大小傾斜——部分記錄的大小遠遠大於平均值。
2)如何收集傾斜數據
在reduce方法中加入記錄map輸出鍵的詳細情況的功能。
public static final String MAX_VALUES = "skew.maxvalues"; private int maxValueThreshold;
@Override public void configure(JobConf job) { maxValueThreshold = job.getInt(MAX_VALUES, 100); } @Override public void reduce(Text key, Iterator<Text> values, OutputCollector<Text, Text> output, Reporter reporter) throws IOException { int i = 0; while (values.hasNext()) { values.next(); i++; }
if (++i > maxValueThreshold) { log.info("Received " + i + " values for key " + key); } } |
3)減少數據傾斜的方法
方法1:抽樣和范圍分區
可以通過對原始數據進行抽樣得到的結果集來預設分區邊界值。
方法2:自定義分區
另一個抽樣和范圍分區的替代方案是基於輸出鍵的背景知識進行自定義分區。例如,如果map輸出鍵的單詞來源於一本書。其中大部分必然是省略詞(stopword)。那么就可以將自定義分區將這部分省略詞發送給固定的一部分reduce實例。而將其他的都發送給剩余的reduce實例。
方法3:Combine
使用Combine可以大量地減小數據頻率傾斜和數據大小傾斜。在可能的情況下,combine的目的就是聚合並精簡數據。
方法4:采用Map Join,盡量避免Reduce Join。
2.2.6 常用的調優參數
1)資源相關參數
(1)以下參數是在用戶自己的mr應用程序中配置就可以生效(mapred-default.xml)
配置參數 |
參數說明 |
mapreduce.map.memory.mb |
一個Map Task可使用的資源上限(單位:MB),默認為1024。如果Map Task實際使用的資源量超過該值,則會被強制殺死。 |
mapreduce.reduce.memory.mb |
一個Reduce Task可使用的資源上限(單位:MB),默認為1024。如果Reduce Task實際使用的資源量超過該值,則會被強制殺死。 |
mapreduce.map.cpu.vcores |
每個Map task可使用的最多cpu core數目,默認值: 1 |
mapreduce.reduce.cpu.vcores |
每個Reduce task可使用的最多cpu core數目,默認值: 1 |
mapreduce.reduce.shuffle.parallelcopies |
每個reduce去map中拿數據的並行數。默認值是5 |
mapreduce.reduce.shuffle.merge.percent |
buffer中的數據達到多少比例開始寫入磁盤。默認值0.66 |
mapreduce.reduce.shuffle.input.buffer.percent |
buffer大小占reduce可用內存的比例。默認值0.7 |
mapreduce.reduce.input.buffer.percent |
指定多少比例的內存用來存放buffer中的數據,默認值是0.0 |
(2)應該在yarn啟動之前就配置在服務器的配置文件中才能生效(yarn-default.xml)
配置參數 |
參數說明 |
yarn.scheduler.minimum-allocation-mb 1024 |
給應用程序container分配的最小內存 |
yarn.scheduler.maximum-allocation-mb 8192 |
給應用程序container分配的最大內存 |
yarn.scheduler.minimum-allocation-vcores 1 |
每個container申請的最小CPU核數 |
yarn.scheduler.maximum-allocation-vcores 32 |
每個container申請的最大CPU核數 |
yarn.nodemanager.resource.memory-mb 8192 |
給containers分配的最大物理內存 |
(3)shuffle性能優化的關鍵參數,應在yarn啟動之前就配置好(mapred-default.xml)
配置參數 |
參數說明 |
mapreduce.task.io.sort.mb 100 |
shuffle的環形緩沖區大小,默認100m |
mapreduce.map.sort.spill.percent 0.8 |
環形緩沖區溢出的閾值,默認80% |
2)容錯相關參數(mapreduce性能優化)
配置參數 |
參數說明 |
mapreduce.map.maxattempts |
每個Map Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。 |
mapreduce.reduce.maxattempts |
每個Reduce Task最大重試次數,一旦重試參數超過該值,則認為Map Task運行失敗,默認值:4。 |
mapreduce.task.timeout |
Task超時時間,經常需要設置的一個參數,該參數表達的意思為:如果一個task在一定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認為該task處於block狀態,可能是卡住了,也許永遠會卡主,為了防止因為用戶程序永遠block住不退出,則強制設置了一個該超時時間(單位毫秒),默認是600000。如果你的程序對每條輸入數據的處理時間過長(比如會訪問數據庫,通過網絡拉取數據等),建議將該參數調大,該參數過小常出現的錯誤提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。 |
2.3 HDFS小文件優化方法
2.3.1 HDFS小文件弊端
HDFS上每個文件都要在namenode上建立一個索引,這個索引的大小約為150byte,這樣當小文件比較多的時候,就會產生很多的索引文件,一方面會大量占用namenode的內存空間,另一方面就是索引文件過大是的索引速度變慢。
2.3.2 解決方案
1)Hadoop Archive:
是一個高效地將小文件放入HDFS塊中的文件存檔工具,它能夠將多個小文件打包成一個HAR文件,這樣在減少namenode內存使用的同時。
2)Sequence file:
sequence file由一系列的二進制key/value組成,如果key為文件名,value為文件內容,則可以將大批小文件合並成一個大文件。
3)CombineFileInputFormat:
CombineFileInputFormat是一種新的inputformat,用於將多個文件合並成一個單獨的split,另外,它會考慮數據的存儲位置。
4)開啟JVM重用
對於大量小文件Job,可以開啟JVM重用會減少45%運行時間。
JVM重用理解:一個map運行一個jvm,重用的話,在一個map在jvm上運行完畢后,jvm繼續運行其他jvm
具體設置:mapreduce.job.jvm.numtasks值在10-20之間。
三 MapReduce實戰
3.2 大數據技術之流量匯總案例
3.3 大數據技術之輔助排序和二次排序案例(GroupingComparator)
3.5 大數據技術之小文件處理(自定義InputFormat)
3.6 過濾日志及自定義日志輸出路徑(自定義OutputFormat)
3.7 大數據技術之日志清洗案例
3.9 大數據技術之找博客共同好友案例
3.10 大數據技術之壓縮解壓縮案例
四 常見錯誤
1)導包容易出錯。尤其Text.
2)Mapper中第一個輸入的參數必須是LongWritable或者NullWritable,不可以是IntWritable. 報的錯誤是類型轉換異常。
3)java.lang.Exception: java.io.IOException: Illegal partition for 13926435656 (4),說明partition和reducetask個數沒對上,調整reducetask個數。
4)如果分區數不是1,但是reducetask為1,是否執行分區過程。答案是:不執行分區過程。因為在maptask的源碼中,執行分區的前提是先判斷reduceNum個數是否大於1。不大於1肯定不執行。
5)在Windows環境編譯的jar包導入到linux環境中運行,
hadoop jar wc.jar com.xyg.mapreduce.wordcount.WordCountDriver /user/root/ /user/root/output
報如下錯誤:
Exception in thread "main" java.lang.UnsupportedClassVersionError: com/root/mapreduce/wordcount/WordCountDriver : Unsupported major.minor version 52.0
原因是Windows環境用的jdk1.7,linux環境用的jdk1.8。
解決方案:統一jdk版本。
6)緩存pd.txt小文件案例中,報找不到pd.txt文件
原因:大部分為路徑書寫錯誤。還有就是要檢查pd.txt.txt的問題。
7)報類型轉換異常。
通常都是在驅動函數中設置map輸出和最終輸出時編寫錯誤。
Map輸出的key如果沒有排序,也會報類型轉換異常。