操作:
MapReduce框架將文件分為多個splits,並為每個splits創建一個Mapper,所以Mappers的個數直接由splits的數目決定。而Reducers的數目可以通過job.setNumReduceTasks()函數設置
1、Map任務的個數:
理論值:
1、Mapper數據過大的話,會產生大量的小文件,過多的Mapper創建和初始化都會消耗大量的硬件資源Mapper數太小,並發度過小,Job執行時間過長,無法充分利用分布式硬件資源
2、map並行度是大約每個節點10-100個map,且最好每個map的執行時間至少一分鍾
合理map個數:
經驗總結:
1、hdfs上的數據進行大塊化
1)如果某個input的文件非常的大,比如 1TB,可以考慮將hdfs上的每個block size設大,這樣map和reduce的數據可以減小。還可以通過命令 :hadoop distcp -Ddfs.block.size=$[256*1024*1024] /path/to/inputdata /path/to/inputdata-with-largeblocks的方式來將已經存在hdfs上的數據進行大塊化。然后刪除掉原先的文件。
2)通常對於每一個輸入的文件會有一個map split。如果輸入文件太大,超過了hdfs塊的大小(128M)那么對於同一個輸入文件我們會有多余2個的map運行起來,會有一個比例進行運算來進行切片,為了減少資源的浪費;
例如一個文件大小為260M,在進行MapReduce運算時,會首先使用260M/128M,得出的結果和1.1進行比較大於則切分出一個128M作為一個分片,剩余132M,再次除以128,得到結果為1.03,小於1.1則將132作為一個切片,即最終260M被切分為兩個切片進行處理,而非3個切片
2、MapReduce讀取HBase表時會通過配置信息獲取HBase表名,然后構造一個HTable對象,未設定開始和結束rowkey,mapper任務數默認是和表分區數相等,詳細參考TableInputFormat的父類TableInputFormatBase;
重新實現一個TableInputFormat類,重寫其中的getSplits()方法,可以自定義實現一個region對應N個Mapper任務
2、reduce任務的個數:
理論值
Reduce任務是一個數據聚合的步驟,數量默認為1。而使用過多的Reduce任務則意味着復雜的shuffle,並使輸出文件的數量激增。
在真正的集群環境下,如果默認,那么所有的中間數據會發送給唯一的Reducer,導致任務變得非常緩慢。通常reduce數量是0.95或者1.75*( nodes * mapred.tasktracker.reduce.tasks.maximum);mapred.tasktracker.tasks.reduce.maximum(mapreduce.tasktracker.reduce.tasks.maximum):一個節點Reduce任務數量上限(默認是2),實際中一般設置為各節點cpu core數量,即能同時計算的slot數量。
合理reduce個數:
可以采用以下兩種方式決定Reduce任務的合理數量:
1.每個reducer都可以在Map任務完成后立即啟動: 0.95 * (節點數量 * mapreduce.tasktracker.reduce.tasks.maximum)
2.較快的節點在完成第一個Reduce任務后,可以開始第二輪的reduce任務:1.75 * (節點數量 * mapreduce.tasktracker.reduce.tasks.maximum)
究竟設多少個Reducers合適呢?為了解決這個問題,首先來了解一下slots的概念
1、slots有點類似一個資源池,每個任務(map和reduce)執行時都必須獲得一個slot才能繼續,否則只能等待。當一個任務完成后,該任務就歸還slot,這個過程有點類似釋放資源到資源池中。顯然,每一個獲得資源的任務都可以立即執行,無需等待。另一方面,mapreduce的任務由tasktracker節點負責執行的,所以slots可進一步理解為tasktrackers能夠並發執行多個任務。slots分為mapper slots和reducer slots,分別對應最大可並行執行的mapper和reducer數
2、reducers 數目的最佳值和reducer slots的總數有關,通常情況下,讓reducers的數目略小於reducer slots的總數,這樣的目的:首先reducers可以並行執行,減少排隊時間;其次對於未執行reducer的slots可以在其他reducer發生故障時,立即分配給新創建的reducer,不會明顯 加長任務總時間。
3、出現reducers > mappers的情況就不合理了,這樣有些mappers會工作消耗資源開銷,但是對任務沒有任何幫助。
經驗總結:
1)job的每個map或者reduce task的運行時間都只有30-40秒鍾,那么就減少該job的map或者reduce數,每一個task(map|reduce)的setup和加入到 調度器中進行調度,這個中間的過程可能都要花費幾秒鍾,所以如果每個task都非常快就跑完了,就會在task的開始和結束的時候浪費太多的時間
2)只要每個task都運行至少30-40秒鍾,就可以考慮將mapper數擴大,比如集群的map slots為100個,那么就不要將一個job的mapper設成101,這樣前100個map能夠並行完成,而最后一個map要在前100個 mapper結束后才開始,因此在reduce開始運行前,map階段的時間幾乎就要翻倍
3)盡量不要運行太多的reduce task。對大多數job來說,最好rduce的個數最多和集群中的reduce持平,或者比集群的 reduce slots小。這個對於小集群而言,尤其重要。
3、JVM重用技術:調節mapred.job.reuse.jvm.num.tasks參數值
1)mapred.job.reuse.jvm.num.tasks默認是1,表示一個JVM上最多可以順序執行的task數目(屬於同一個Job)是1。也就是說一個task啟一個JVM為每個task啟動一個新的JVM將耗時1秒左右,對於運行時間較長(比如1分鍾以上)的job影響不大,但如果都是時間很短的task,那么頻繁啟停JVM會有開銷。
2)如果我們想使用JVM重用技術來提高性能,那么可以將mapred.job.reuse.jvm.num.tasks設置成大於1的數。這表示屬於同一job的順序執行的task可以共享一個JVM,也就是說第二輪的map可以重用前一輪的JVM,而不是第一輪結束后關閉JVM,第二輪再啟動新的JVM。
3)那么最多一個JVM能順序執行多少個task才關閉呢?這個值就是mapred.job.reuse.jvm.num.tasks。如果設置成-1,那么只要是同一個job的task(無所謂多少個),都可以按順序在一個JVM上連續執行。
4)如果task屬於不同的job,那么JVM重用機制無效,不同job的task需要不同的JVM來運行。