1.剖析MapReduce作業運行機制
1).經典MapReduce--MapReduce1.0
整個過程有有4個獨立的實體
- 客戶端:提交MapReduce
- JobTracker:協調作業的運行
- TaskTracker:運行作業划分后的任務
- HDFS:用來在其他實體之間共享作業文件
以下為運行整體圖

A.作業的提交
JobClient的runJob是用於新建JobClient實例並調用其submitJob()方法的便捷方式,提交Job后,runJob()每秒輪詢檢測作業的進度,隨時監控Job的運行狀態。
其中JobClient的submitJob()方法所實現的作業提交過程:
- 向JobTracker請求一個新的作業ID
- 檢查作業的輸出說明
- 計算作業的輸入分片
- 將運行作業所需要的資源(Jar文件,配置文件和計算所得輸入分片)復制到一個作業ID命名的目錄下JobTracker的文件系統中。
B.作業的初始化
JobTracker接收對其提交的作業后,會把這個調用放入一個隊列,交由作業調度器調度,初始化。初始化包括創建一個表示正在運行作業的對象---封裝任務和記錄信息,以便跟蹤任務的狀態和進程
C.任務的分配
TaskTracker運行簡單的循環來對JobTracker發送心跳,告知自己的是否存活,同時交互信息,對於map任務和reduce任務,TaskTracker會分配適當的固定數量的任務槽,理想狀態一般遵循數據本地化,和機架本地化
D.任務的執行
第一步:TaskTracker拷貝JAR文件到本地,第二部:TaskTracker新建本地目錄,將JAR文件加壓到其下面;第三步:TaskTracker新建一個TaskRunner實例運行該任務。
Streaming和Pipes可運行特殊的Map和Reduce任務,Streaming支持多語言的編寫,Pipes還可以與C++進程通信,如下圖:

E.進程和狀態的更新
通過Job的Status屬性對Job進行檢測,例如作業雲習慣狀態,map和reduce運行的進度、Job計數器的值、狀態消息描述等等,尤其對計數器Counter(計數器)屬性的檢查。狀態更新在MapReduce系統中的傳遞流程如下

F.作業的完成
當JobTracker收到Job最后一個Task完成的消息時候便把Job的狀態設置為”完成“,JobClient得知后,從runJob()方法返回
2).Yarn(MapReduce 2.0)
Yarn出現在Hadoop 0.23和2.0版本中,相對前面 MapReduce的性能有不少的提高
相比較MapReduce1.0,JobTracker在MRv2 中被拆分成了兩個主要的功能使用守護進程執行:資源管理和任務的調度與監視。這個想法創建一個全局的資源管理(global ResourceManager (RM))和為每個應用創建一個應用管理(ApplicationMaster (AM))。一個應用可以使一個MR jobs的經典場景或者是一串連續的Job
ResourceManager 和每個slave節點的NodeManager (NM)構成一個資源估算框架。ResourceManager 對在系統中所有應用的資源分配擁有最終的最高級別仲裁權。其中ResourceManager 擁有兩個主要的組件:調度器(Scheduler) 和資源管理器(ApplicationsManager)
實際上每個應用的ApplicationMaster(AM)是資源估算框架具體用到的lib包,被用來和ResourceManager 進行資源談判,並且為NodeManager執行和監控task
整體如下圖:

綜上,在Hadoop Yarn中有5個獨立的實體
- 客戶端:用來提交MapReduce作業(Job)的
- Yarn ResourcesManager:用來管理協調分配集群中的資源
- Yarn NodeManager:用來啟動和監控本地計算機資源單位Container的利用情況
- MapReduce Application Master:用來協調MapReduce Job下的Task的運行。它和MapReduce Task 都運行在 Container中,這個Container由RM(ResourcesManager)調度並有NM(NodeManager)管理
- HDFS:用來在其他實體之間共享作業文件
整體如下:

A.作業的提交
Job提交相似於MapReduce1.0.當在配置文件中設置mapreduce.framework.name為yarn時候,MapReduce2.0繼承接口ClientProtocol的模式就激活了,RM生成新的Job ID(從Yarn的角度看是Application ID---步驟2),接着Job客戶端計算輸入分片,拷貝資源(包括Job JAR文件、配置文件,分片信息)到HDFS(步驟3),最后用submitApplication函數提交Job給RM(步驟4)
B.作業的初始化
RM接受到由上面的A提交過來的調用,將其請求給調度器處理,調度器分配Container,同時RM在NM上啟動Application Master進程(步驟5a和5b),AM主函數MRAppMatser會初始化一定數量的記錄對象(bookkeeping)來跟蹤Job的運行進度,並收取task的進度和完成情況(步驟6),接着MRAppMaster收集計算后的輸入分片
之后與MapReduce1.0又有所不同,此時Application Master會決定如何組織運行MapReduce Job,如果Job很小,能在同一個JVM,同一個Node運行的話,則用uber模式運行(參見源碼)
C.任務的分配
如果不在uber模式下運行,則Application Master會為所有的map和reducer task向RM請求Container,所有的請求都通過heartbeat(心跳)傳遞,心跳也傳遞其他信息,例如關於map數據本地化的信息,分片所在的主機和機架地址信息,這信息幫主調度器來做出調度的決策,調度器盡可能遵循數據本地化或者機架本地化的原則分配Container
在Yarn中,不像MapReduce1.0中那樣限制map或者reduce的slot個數,這樣就限制了資源是利用率,Yarn中非配資源更具有靈活性,可以在配置文件中設置最大分配資源和最小分配資源,例如,用yarn.scheduler.capacity.minimum-allocation-mb設置最小申請資源1G,用yarn.scheduler.capacity.maximum-allocation-mb設置最大可申請資源10G 這樣一個Task申請的資源內存可以靈活的在1G~10G范圍內
D.任務的執行
分配給Task任務Container后,NM上的Application Master就聯系NM啟動(starts)Container,Task最后被一個叫YarnChild的main類執行,不過在此之前各個資源文件已經從分布式緩存拷貝下來,這樣才能開始運行map Task或者reduce Task。PS:YarnChild是一個(dedicated)的JVM
Streaming 和 Pipes 運行機制與MapReduce1.0一樣
E.進程和狀態的更新
當Yarn運行同時,Task和Container會報告它的進度和狀態給Application Master,客戶端會每秒輪詢檢測Application Master,這樣就隨時收到更新信息,這些信息也哭通過Web UI來查看

F.作業的完成
客戶端每5秒輪詢檢查Job是否完成,期間需要調用函數Job類下waitForCompletion()方法,Job結束后該方法返回。輪詢時間間隔可以用配置文件的屬性mapreduce.client.completion.pollinterval來設置
2.失敗情況
1)經典MapReduce---MapReduce1.0
A.TasK失敗
第一種情況:map或reduce任務中的用戶代碼拋出運行異常,此時子進程JVM進程會在退出之前想TaskTracker發送錯誤報告,錯誤報告被記錄錯誤日志,TaskTracker會將這個任務(Task)正在運行的Task Attempt標記為失敗,釋放一個任務槽去運行另外一個Task Attempt
第二種情況:子進程JVM突然退出Task Tracker會注意到JVM退出,並將此Task Attempt標記為失敗
JobTracker通過心跳得知一個Task Attempt失敗后,會重啟調度該Task的執行,默認情況下如果失敗4次不會重試(通過mapred.map.max.attempts可改變這個次數),整個Job也會標記為失敗
B.TaskTracker失敗
如果TaskTracker由於崩潰或者運行過慢失敗,則停止向JobTracker發送心跳,JobTracker會注意到這點並將這個TaskTracker從等待任務調度TaskTracker池中移除
即使TaskTracker沒有失敗,也有可能因為失敗任務次數遠遠高於集群的平均失敗次數,這種情況會被列入黑名單,在重啟后才將此TaskTracker移出黑名單
C.JobTracker失敗
JobTracker失敗是是最嚴重的是愛,此時只得重新開始提交運行
2).Yarn失敗
A.Task(任務)的失敗
情況與MapReduce1.0相似,其中Task Attempt失敗這個消息會通知Application Master,由Application Master標記其為失敗。當Task失敗的比例大於mapreduce.map.failures.maxpercent(map)或者mapreduce.reduce.failures.maxpercent(reduece)時候,Job失敗
B.Application Master的失敗
與前面相似,當Application Master失敗,會被標記為失敗,這是RM會立刻探尋到AM(Application Master)的失敗,並新實例化一個AM和借助NM建造新的相應的Container,在設置yarn.app.mapreduce.am.job.recovery.enable為true情況下失敗的AM能夠恢復,並且恢復后並不返回。默認情況下,失敗AM會讓所有的Task返回
如果客戶端輪詢得知AM失敗后,經過一段時間AM失敗狀態仍然沒有改變,則重新想RM申請相應的資源
C.Node Manager的失敗
NM失敗時,會停止向RM發送心跳,則RM會將這個NM從可用的NM池中移出,心態間隔時間可由yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms設置,默認是10分鍾
如果NM上Application失敗次數過高,此NM會被列入黑名單,AM會在不同Node上運行Task
D.Resources Manager的失敗
RM的失敗是最嚴重,離開了RM整個系統將陷入癱瘓,所以它失敗后,會利用checkpoint機制來重新構建一個RM,詳細配置見權威指南和Hadoop官網
3.作業的調度
1).FIFO Scheduler
這個調度是Hadoop默認d ,使用FIFO調度算法來運行Job,並且可以設置優先級,但是這個FIFO調度並不支持搶占,所以后來的高優先級的Job仍然會被先來低優先級的Job所阻塞
2)Fair Scheduler
Fair Scheduler的目標是讓每個用戶公平的享有集群當中的資源,多個Job提交過來后,空閑的任務槽資源可以以"讓每個用戶公平共享集群"的原則被分配,某個用戶一個很短的Job將在合理時間內完成,即便另一個用戶有一個很長的Job正在運行
一般Job都放在Job池當中,默認時,每個用戶都有自己的Job池,當一個用戶提交的Job數超過另一個用戶時,不會因此得到更多的集群資源
另外Fair Scheduler支持搶占,如果一個池的資源未在一段時間內公平得到集群資源,那么Fair Scheduler會從終止得到多余集群資源的Task,分給前者。
3).Capacity Scheduler
Capacity Scheduler中,集群資源會有很多隊列,每個隊列有一定的分配能力,在每個隊列內會按照FIFO Scheduler去分配集群資源。
4.shuffle和排序
在Hadoop Job運行時,MapReduce會確保每個reducer的輸入都按鍵排序,並且執行這個排序過程---將map的輸出所謂reducer的輸入---稱為shuffle,從許多方面看來,shuffle正是map的心臟。
以下掩蓋了一些細節,並且新版Hadoop,在這一塊有修改
1).map端
MapTask都一個環形的內存緩沖區,之后當緩沖區被占用內到達一定比例(比如80%),會啟用spill線程將緩沖區中的數據寫入磁盤,寫入磁盤前,spill線程會據根據最終要送達的reduce將數據划分為相應的partition,每個partition中,線程會按照鍵進行內排序(Haoop2.0顯示的用的是快排序),當spill線程執行處理最后一批MapTask的輸出結構后,啟用merger合並spill文件,如果設置Combiner,那接下來執行Combine函數,合並本地相同鍵的文件
2).reduce端
接下來運行ReduceTask,其中的Fetch的線程會從Map端以HTTP方式獲取相應的文件分區,完成復制map的輸出后,reducer就開始排序最后運行merger把復制過來的文件存儲在本地磁盤。(PS:在Yarn中 Map和Reduce之間的數據傳輸用到了Netty以及Java NIO 詳見源代碼)
這里需要注意的是:每趟合並目標是合並最小數量的文件以便滿足最后一趟的合並系數,eg:有40個文件,我們不會再4趟中,每趟合並10個文件然后得到4個文件,相反第一堂只合並4個文件,最后的三趟每次合並10個文件,在最后的一趟中4個已經合並的文件和余下的6個文件(未合並)進行10個文件的合並(見下圖),其實這里並沒有改變合並次數,它只是一個優化措施,盡量減少寫到磁盤的數據量,因為最后一趟總是合並到reduce(?這個地方合並來源來自內存和磁盤,減少了從內存的文件數,所以減少最后一次寫到磁盤的數據量)

從Map到Reducer數據整體傳輸過程如下:

3)配置的調優
調優總的原則給shuffle過程盡量多提供內存空間,在map端,可以通過避免多次溢出寫磁盤來獲得最佳性能(相關配置io.sort.*,io.sort.mb),在reduce端,中間數據全部駐留在內存時,就能獲得最佳性能,但是默認情況下,這是不可能發生的,因為一般情況所有內存都預留給reduce含函數(如需修改 需要配置mapred.inmem.merge.threshold,mapred.job.reduce.input.buffer.percent)
5.Task的執行
1).任務執行環境
Hadoop為MapTask和ReduceTask提供了運行環境相關信息,例如MapTask可以找到他所處理文件的名稱,通過為mapper和reducer提供一個configure()方法實現,表可獲得下圖中的Job的配置信息。

Hadoop設置Job的配置參數可以作為Streaming程序的環境變量。
2).推測執行(Speculative Execution)
Speculative Execution機制的為了解決Hadoop中出現緩慢某些Task拖延整個Job運行的問題,Speculative Execution會針對那些慢於平均進度的Task啟動Speculative Task,此時如果原Task在Speculative Task前完成,則Speculative Task會被終止,同樣的,如果Speculative Task先於原Task完成則原來的Task會被終止
默認情況下Speculative Execution是啟用的,下面的屬性可以控制是否開啟該功能:


3).Output Committers
Hadoop MapReduce利用commit協議確保在Job或者Task運行期間插入是適當的操作,無論他們成功完成或者失敗,在新的API中OutputCommitter由接口OutputFormat決定,
下面是OutputCommitter的API:
public abstract class OutputCommitter { public abstract void setupJob(JobContext jobContext) throws IOException; public void commitJob(JobContext jobContext) throws IOException { } public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException { } public abstract void setupTask(TaskAttemptContext taskContext) throws IOException; public abstract boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException; public abstract void commitTask(TaskAttemptContext taskContext) throws IOException; public abstract void abortTask(TaskAttemptContext taskContext) throws IOException; }
其中當將要運行Job時候運行setupJob()方法;當Job運行成功后調用commitJob()方法,並且輸出目錄后綴_SUCCESS;當Job沒有成功運行,則調用abortJob(),並且刪除相應的輸出文件;相類似Task執行成功調用commitTask()方法,Task執行失敗調用abortTask()方法,並且刪掉相應生成的文件
4).Task JVM重用
啟動JVM重用后,可以讓不同Task重用一個JVM,節省了重建銷毀JVM的時間,在Hadoop2.0中默認情況不提供這種功能
5).跳過壞記錄
在大數據中經常有一些損壞的記錄,當Job開始處理這個損壞的記錄時候,導致Job失敗,如果這些記錄並不明顯影響運行結果,我們就可以跳過損壞記錄讓Job成功運行
一般情況,當任務失敗失敗兩次后會啟用skipping mode,對於一直在某記錄失敗的Task,NM或者TaskTracker將會運行一下TaskAttempt
A.任務失敗
B.任務失敗
C.開啟skipping mode。任務失敗,失敗記錄由TaskTracker或者NM保存
D.仍然啟用skipping mode,任務繼續運行,但是跳過上一次嘗試失敗的壞記錄
在默認情況下,skipping mode是關閉的 可以用SkipBadRecords類來啟用該功能
