MapReduce 跑的慢 的原因
Mapreduce 程序效率的瓶頸在於兩點:
1)計算機性能
CPU、內存、磁盤健康、網絡
2)I/O 操作優化
(1)數據傾斜
(2)map 和 reduce 數設置不合理
(3)map 運行時間太長,導致 reduce 等待過久
(4)小文件過多
(5)大量的不可分塊的超大文件
(6)spill 次數過多
(7)merge 次數過多等。
MapReduce 優化方法
MapReduce 優化方法主要從六個方面考慮:數據輸入、Map 階段、Reduce 階段、IO 傳
輸、數據傾斜問題和常用的調優參數。
- 數據輸入
(1)合並小文件:在執行 mr 任務前將小文件進行合並,大量的小文件會產生大量的
map 任務,增大 map 任務裝載次數,而任務的裝載比較耗時,從而導致 mr 運行較慢。
(2)采用 CombineTextInputFormat 來作為輸入,解決輸入端大量小文件場景。
- Map 階段
1 )減少 溢寫( (spill) ) 次數:通過調整 io.sort.mb 及 sort.spill.percent 參數值,增大觸發
spill 的內存上限,減少 spill 次數,從而減少磁盤 IO。
2 )減少 合並( (merge) ) 次數:通過調整 io.sort.factor 參數,增大 merge 的文件數目,減
少 merge 的次數,從而縮短 mr 處理時間。
3)在 map 之后, 不影響業務邏輯前提下,行 先進行 combine 處理,減少 I/O。
- Reduce 階段
1 )合理設置 map 和 和 reduce 數 數:兩個都不能設置太少,也不能設置太多。太少,會導
致 task 等待,延長處理時間;太多,會導致 map、reduce 任務間競爭資源,造成處理超時
等錯誤。
2 )設置 map 、reduce 共存:調整 slowstart.completedmaps 參數,使 map 運行到一定程
度后,reduce 也開始運行,減少 reduce 的等待時間。
3) )用 規避使用 reduce :因為 reduce 在用於連接數據集的時候將會產生大量的網絡消耗。
4 )合理設置 reduce 端的 buffer :默認情況下,數據達到一個閾值的時候,buffer 中的
數據就會寫入磁盤,然后 reduce 會從磁盤中獲得所有的數據。也就是說,buffer 和 reduce
是沒有直接關聯的,中間多個一個寫磁盤->讀磁盤的過程,既然有這個弊端,那么就可以通
過參數來配置,使得 buffer 中的一部分數據可以直接輸送到 reduce,從而減少 IO 開銷:
mapred.job.reduce.input.buffer.percent,默認為 0.0。當值大於 0 的時候,會保留指定比例的
內存讀 buffer 中的數據直接拿給 reduce 使用。這樣一來,設置 buffer 需要內存,讀取數據需要內存,
reduce 計算也要內存,所以要根據作業的運行情況進行調整
- IO 傳輸
1) 采用數據壓縮的方式 式,減少網絡 IO 的的時間。安裝 Snappy 和 LZO 壓縮編碼器。
2) 使用 使用 SequenceFile 二進制文件。
- 數據傾斜問題
1)數據傾斜現象
數據頻率傾斜——某一個區域的數據量要遠遠大於其他區域。
數據大小傾斜——部分記錄的大小遠遠大於平均值。
2)減少數據傾斜的方法
方法 1 :抽樣和范圍分區
可以通過對原始數據進行抽樣得到的結果集來預設分區邊界值。
方法 2 :自定義分區
基於輸出鍵的背景知識進行自定義分區。例如,如果 map 輸出鍵的單詞來源於一本書。且其中某幾個專業詞匯較多。
那么就可以自定義分區將這這些專業詞匯發送給固定的一部分reduce 實例。而將其他的都發送給剩余的 reduce 實例。
方法 3 :Combine
使用 Combine 可以大量地減小數據傾斜。在可能的情況下,combine 的目的就是聚合並精簡數據
方法 4:用 采用 Map Join, ,免 盡量避免 Reduce Join 。
- 常用的調優參數
1)資源相關參數
(1)以下參數是在用戶自己的 mr 應用程序中配置就可以生效(mapred-default.xml)
配置參數 | 說明 |
---|---|
mapreduce.map.memory.mb | 一個 Map Task 可使用的資源上限(單位:MB),默認為 1024。如果 Map Task 實際使用的資源量超過該值,則會被強制殺死。 |
mapreduce.reduce.memory.mb | 一個 Reduce Task 可使用的資源上限(單位:MB),默認為 1024。如果 Reduce Task實際使用的資源量超過該值,則會被強制殺死。 |
mapreduce.map.cpu.vcores | 每個 Map task 可使用的最多 cpu core 數目,默認值: 1 |
mapreduce.reduce.cpu.vcores | 每個 Reduce task 可使用的最多 cpu core 數目,默認值: 1 |
mapreduce.reduce.shuffle.parallelcopies | 每個 reduce 去 map 中拿數據的並行數。默認值是 5 |
mapreduce.reduce.shuffle.merge.percent | buffer 中的數據達到多少比例開始寫入磁盤。默認值 0.66 |
mapreduce.reduce.shuffle.input.buffer.percent | buffer 大小占 reduce 可用內存的比例。默認值 0.7 |
mapreduce.reduce.input.buffer.percent | 指定多少比例的內存用來存放 buffer 中的數據,默認值是 0.0 |
(2)應該在 yarn 啟動之前就配置在服務器的配置文件中才能生效(yarn-default.xml)
配置參數 | 說明 |
---|---|
yarn.scheduler.minimum-allocation-mb 1024 | 給應用程序 container 分配的最小內存 |
yarn.scheduler.maximum-allocation-mb 8192 | 給應用程序 container 分配的最大內存 |
yarn.scheduler.minimum-allocation-vcores 1 | 每個 container 申請的最小 CPU 核數 |
yarn.scheduler.maximum-allocation-vcores 32 | 每個 container 申請的最大 CPU 核數 |
yarn.nodemanager.resource.memory-mb 8192 | 給 containers 分配的最大物理內存 |
(3)shuffle 性能優化的關鍵參數,應在 yarn 啟動之前就配置好(mapred-default.xml)
配置參數 | 說明 |
---|---|
mapreduce.task.io.sort.mb 100 | shuffle 的環形緩沖區大小,默認 100m |
mapreduce.map.sort.spill.percent 0.8 | 環形緩沖區溢出的閾值,默認 80% |
2)容錯相關參數(mapreduce 性能優化)
配置參數 | 說明 |
---|---|
mapreduce.map.maxattempts | 每個 Map Task 最大重試次數,一旦重試參數超過該值,則認為 Map Task 運行失敗,默認值:4。 |
mapreduce.reduce.maxattempts | 每個 Reduce Task 最大重試次數,一旦重試參數超過該值,則認為 Map Task 運行失敗,默認值:4。 |
mapreduce.task.timeout | Task 超時時間,經常需要設置的一個參數,該參數表達的意思為:如果一個 task 在一定時間內沒有任何進入,即不會讀取新的數據,也沒有輸出數據,則認為該 task處於 block 狀態,可能是卡住了,也許永遠會卡主,為了防止因為用戶程序永遠 block 住不退出,則強制設置了一個該超時時間(單位毫秒),默認是 600000。如果你的程序對每條輸入數據的處理時間過長(比如會訪問數據庫,通過網絡拉取數據等),建議將該參數調大,該 參 數 過 小 常 出 現 的 錯 誤 提 示 是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by theApplicationMaster.”。 |
大量小文件的優化策略
- HDFS 小文件弊端
HDFS 上每個文件都要在 namenode 上建立一個索引,這個索引的大小約為 150byte,這
樣當小文件比較多的時候,就會產生很多的索引文件,一方面會大量占用 namenode 的內存
空間,另一方面就是索引文件過大是的索引速度變慢
- 解決方案
1)Hadoop Archive:
是一個高效地將小文件放入 HDFS 塊中的文件存檔工具,它能夠將多個小文件打包成
一個 HAR 文件,這樣就減少了 namenode 的內存使用
2)Sequence file:
sequence file 由一系列的二進制 key/value 組成,如果 key 為文件名,value 為文件內容,
則可以將大批小文件合並成一個大文件。
3)CombineTextInputFormat:
CombineTextInputFormat是一種新的 inputformat,用於將多個文件合並成一個單獨的
split,另外,它會考慮數據的存儲位置。
4)開啟 JVM 重用
對於大量小文件 Job,可以開啟 JVM 重用會減少 45%運行時間。
JVM 重用理解:一個 map 運行一個 jvm,重用的話,在一個 map 在 jvm 上運行完畢后,
jvm 繼續運行其他 map。
具體設置:mapreduce.job.jvm.numtasks 值在 10-20 之間。
- TextFileInputFormat
在Input時,將小文件組合成大文件
如果已存在HDFS中,可以用CombineTextInputFormat進行切片,
他可以將多個小文件從邏輯上規划到一個切片上,這樣就可以將多個小文件放到一個MapTask中處理
1)默認情況下 TextInputformat 對任務的切片機制是按文件規划切片,不管文件多小,都會
是一個單獨的切片,都會交給一個 maptask,這樣如果有大量小文件,就會產生大量的
maptask,處理效率極其低下。
2)優化策略
(1)最好的辦法,在數據處理系統的最前端(預處理/采集),將小文件先合並成大文
件,再上傳到 HDFS 做后續分析。
(2)補救措施:如果已經是大量小文件在 HDFS 中了,可以使用另一種 InputFormat
來做切片(CombineTextInputFormat),它的切片邏輯跟 TextFileInputFormat 不同:它可以
將多個小文件從邏輯上規划到一個切片中,這樣,多個小文件就可以交給一個 maptask。
(3)優先滿足最小切片大小,不超過最大切片大小
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
3)具體實現步驟
// 如果不設置 InputFormat,它默認用的是 TextInputFormat.class
job.setInputFormatClass(CombineTextInputFormat.class)
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
實操案例:
### 默認是TextInputFormat
代碼如WordCount,不進行修改,直接運行,我們會看到控制台打印:
number of splits:5
5個小文件分為了5個切片
#### 修改代碼 將合並文件之后進行切片
設置InputFormatClass為CombineTextInputFormat
在Runner中設置: 注意導包 mapreduce.input
job.setInputFormatClass(CombineTextInputFormat.class)
//設置切片最大最小值
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);// 4m
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);// 2m
#### 切片如果計算 優先滿足最小切片 設置最小切片大小為2M 最大切片大小為4M
4個文件:分別是0.5M,1M,0.3M,5M
舉例:0.5m+1m+0.3m+5m=2m + 4.8m=2m + 4m + 0.8m
4個文件,優先滿足最小切片,所以先組裝2M
0.5+1+0.3+0.2 (0.2是從5M中取出來) 這樣滿足2M 形成一個切片
4.8 M 超過最大 4M 是一個切片
0.8M 是剩下的一個切片
一共分為了3個切片
本博客僅為博主學習總結,感謝各大網絡平台的資料。蟹蟹!!