一 概述
MRv1主要由編程模型(MapReduce API)、資源管理與作業控制塊(由JobTracker和TaskTracker組成)和數據處理引擎(由MapTask和ReduceTask組成)三部分組成。而YARN出現之后。資源管理模塊則交由YARN實現,這樣為了讓MapReduce框架執行在YARN上。僅須要一個ApplicationMaster組件完畢作業控制模塊功能就可以,其他部分,包含編程模型和數據處理引擎等,可直接採用MRv1原有的部分。
二 MRAppMaster組成
MRAppMaster是MapReduce的ApplicationMaster實現。它使得MapReduce應用程序能夠直接執行於YARN之上。在YARN中,MRAppMaster負責管理MapReduce作業的生命周期。包含作業管理、資源申請與再分配、Container啟動與釋放、作業恢復等。
MRAppMaster 主要由已下幾種組件/服務組成:
ConainterAllocator
與RM通信,為MapReduce作業申請資源。作業的每一個任務資源需求可描寫敘述為5元組:
<Priority,hostname,capacity,containers,relax_locality>,分別表示作業優先級、期望資源所在的host、資源量(當前支持內存和CPU兩種資源)、Container數據是否松弛本地化
ClientService
ClientService是一個接口,由MRClientService實現。MRClientService實現了MRClientProtocol協議,client能夠通過該協議獲取作業的運行狀態(不必通過RM)和控制作業(比方殺死作業、改變作業優先級等)。
Job
表示一個MapReduce作業,與MRv1中的JobInProgress功能是一樣的。負責監控作業的執行狀態。它維護了一個作業的狀態機,以實現異步執行各種作業相關的操作。
Task
表示一個MapReduce作業的某個任務。與MRv1中的TaskInProgress功能類似。負責監控一個任務的執行狀態。它維護了一個任務狀態機。以實現異步執行各種任務相關的操作。
TaskAttempt
表示一個任務執行實例。它的執行邏輯與MRV1中的MapTask和ReduceTask執行實例全然一致。實際上,它直接使用了MRv1中的數據處理引擎,但經過了一些優化。
TaskCleaner
負責清理失敗任務或被殺死任務使用的文件夾和產生的暫時結果(統稱為垃圾數據),它維護了一個線程池和一個共享隊列。異步刪除任務產生的垃圾數據。
Speculator
完畢猜測執行功能。當同一個作業的某個任務執行速度明顯慢於其它任務時,會為該任務啟動一個備份任務。
ContainerLauncher
負責與NM通信,以啟動一個Container.當RM為作業分配資源后,ContainerLauncher會將任務執行相關信息填充到Container中。包含任務執行所需資源、任務執行命令、任務執行環境、任務依賴的外部文件等。然后與相應的NodeManager通信,要求它啟動Container.
TaskAttemptListener
負責管理各個任務的心跳信息,假設一個任務一段時間內未匯報心跳,則覺得它死掉了。會將其從系統中移除。
JobHistoryEventHandler
負責對作業的各個事件記錄日志。當MRApMaster出現問題時。YARN會將其又一次調度到還有一個節點上。
未了避免又一次計算。MRAppMaster首先從HDFS上讀取上次執行產生的日志,以恢復已經完畢的任務,進而可以僅僅執行尚未執行完畢的任務。
三 MapReduceclient
MapReduceclient是MapReduce用戶與YARN進行通信的唯一途徑,通過該client。用戶能夠向YARN提交作業,獲取作業的執行狀態和控制作業(比方殺死作業、殺死任務等).MapReduceclient涉及兩個RPC通信協議:
1.ApplicationClientProtol
在YARN中,RM實現了ApplicationClientProtocol協議,不論什么client須要使用該協議完畢提交作業、殺死作業、改變作業的優先級等操作。
2.MRClientProtocol
當作業的ApplicationMaster成功啟動后,它會啟動MRClientService服務,該服務實現了MRClientProtoclo協議,從而同意client直接通過該協議與ApplicationMater通信以控制作業和查詢作業執行狀態。以減輕ResourceManager負載。
四 MRAppMaster工作流程
依照作業的大小不同。MRAppMaster提供了三種作業執行模式:
本地模式(通經常使用於作業調試,同MRv1一樣,不再贅述)、Uber模式和Non-Uber模式。
對於小作業為了減少延遲。可採用Uber模式,在該模式下,全部Map Task和Reduce Task在同一個Container(MRAppMaster所在的Container)中順次執行;對於大作業。則採用Non-Uber模式,在該模式下,MRAppMaster先為Map Task申請資源。當Ma Task執行完畢數目達到一定比例之后再為Reduce Task申請資源。
對於Map Task而言。它的生命周期為Scheduled->assigned->completed;
而對於Reduce Task而言,它的生命周期為pending->scheduled->assigned->completed.
在YARN之上執行MapReduce作業須要解決兩個關鍵問題:怎樣確定Reduce Task啟動時機以及怎樣完畢Shuffle功能。
為了避免Reduce Task過早啟動造成資源利用率低下,MRAppMaster讓剛啟動的Reduce Task處於pending狀態。以便可以依據Map Task執行情況決定是否對其進行調度。
MRAppMaster在MRv1原有策略基礎之上加入了更為嚴格的資源控制策略和搶占策略。在YARN中。NodeManager作為一種組合服務模式。同意動態載入應用程序暫時須要的附屬服務,利用這一特性,YARN將Shuffle HTTP Sever組成一種服務,以便讓各個NodeManager啟動時載入它。
當用戶向YARN提交一個MapReduce應用程序后,YARN 將分兩個階段執行該應用程序:第一個階段是由ResourceManager啟動MRAppMaster;第二個階段是由MARppMaster創建應用程序。為它申請資源。並監控它的整個執行過程。直到執行完畢。
步驟1 用戶向YARN中(RM)提交應用程序,當中包含ApplicationMaster程序、啟動ApplicationMaster的命令、用戶程序等。
步驟2 ResourceManager為該應用程序分配第一個Container,ResouceManage與某個NodeManager通信,啟動應用程序ApplicationMaster,NodeManager接到命令后,首先從HDFS上下載文件(緩存),然后啟動ApplicationMaser。
當ApplicationMaster啟動后,它與ResouceManager通信,以請求和獲取資源。ApplicationMaster獲取到資源后,與相應的NodeManager通信以啟動任務。
注:1.假設該應用程序第一次在給節點上啟動任務。則NodeManager首先從HDFS上下載文件緩存到本地,這個是由分布式緩存實現的。然后啟動該任務。
2. 分布式緩存並非將文件緩存到集群中各個結點的內存中,而是將文件換到各個結點的磁盤上,以便運行任務時候直接從本地磁盤上讀取文件。
步驟3 ApplicationMaster首先向ResourceManager注冊。這樣用戶能夠直接通過ResourceManage查看應用程序的執行狀態,然后它將為各個任務申請資源。並監控它們的執行狀態。直到執行結束,即反復步驟4~7。
步驟4 ApplicationMaster採用輪詢的方式通過RPC協議向ResourceManager申請和領取資源。
步驟5 一旦ApplicationMaster申請到資源后,ApplicationMaster就會將啟動命令交給NodeManager,要求它啟動任務。啟動命令里包括了一些信息使得Container能夠與Application Master進行通信。
步驟6 NodeManager為任務設置好執行環境(包含環境變量、JAR包、二進制程序等)后,將任務啟動命令寫到一個腳本中,並通過執行該腳本啟動任務(Container)。
步驟7 在Container內執行用戶提交的代碼,各個Container通過某個RPC協議向ApplicationMaster匯報自己的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的執行狀態,從而能夠在任務失敗時又一次啟動任務。
步驟8 在應用程序執行過程中,用戶可隨時通過RPC向ApplicationMaster查詢應用程序的當前執行狀態。
步驟9 應用程序執行完畢后。ApplicationMaster向ResourceManager注銷並關閉自己
五 MRAppMaster 生命周期
MRAppMaster依據InputFormat組件的詳細實現(一般是依據數據量切分數據),將作業分解成若干個Map Task和Reduce Task,當中每一個Map Task 負責處理一片Inputsplit數據,而每一個Reduce Task則進一步處理Map Task產生的中間結果。每一個Map/Reduce Task僅僅是一個詳細計算任務的描寫敘述,真正的任務計算工作則是由執行實例TaskAttempt完畢的。每一個Map/Reduce Task可能順次啟動多個執行實例,比方第一個執行實例失敗了,則另起一個新的實例又一次計算,直到這一份數據處理完畢或者嘗試次數達到上限。
Job狀態機
Job狀態機維護了一個MapReduce應用程序的生命周期,即從提交到執行結束的整個過程。一個Job由多個Map Task和Reduce Task構成。而Job狀態機負責管理這些任務。Job狀態機由類JobImpl實現。
Task狀態機
Task維護了一個任務的生命周期。即從創建到執行結束整個過程。一個任務可能存在多次執行嘗試。每次執行嘗試被稱為一個“執行實例”,Task狀態機負責管理這些執行實例。Task狀態機由TaskImpl實現。
注意:1.MRAppMaster為任務申請到資源后,與相應的NodeManager通信成功啟動Container。須要注意的是,在某一個時刻,一個任務可能有多個執行實例,且可能存在執行失敗的實例。可是僅僅要有一個實例執行成功,則意味着該任務執行完畢。
2. 每一個任務的執行實例數目都有一定上限,一旦超過該上限,才覺得該任務執行失敗,當中Map Task執行實例數目上限默認位4,Reduce Task執行實例默認也是 4.一個任務的失敗並不一定導致整個作業執行失敗,這取決於作業的錯誤容錯率。
TaskAttempt狀態機
TaskAttempt狀態機維護了 一個任務執行實例的生命周期,即從創建到執行結束整個過程。它由TaskAttempImpl類實現。
在YARN 中,任務實例是執行在Container中的。因此。Container狀態的變化往往伴隨任務實例的狀態變化,比方任務實例執行完畢后。會清理Container占用的空間,而Container空間的清理實際上就是任務實例空間的清理。任務實例執行完后,需向MRAppMaster請求提交終於結果,一旦提交完畢后。該任務的其他實例就將被殺死。
總結一個作業的運行過程大致例如以下:
創建實例=》MRApMaster向ResourceManager申請資源=》獲得Container=》啟動Container(執行實例)=》提交執行結果=》清理結果
當一個Container執行結束后,MRAppMaster可直接從ResourceManager上獲知。各個任務執行實例需定期向MRAppMaster匯報進度和狀態,否則MRAppMaster覺得該任務處於僵死狀態。會將它殺死,每次匯報均會觸發一個TA_UPDATE事件。
注:1.MRAppMaster能夠由兩條路徑來得知Conainer的當前執行狀態:
a. 通過ResourceManager(MRAppMaster與ResouceManager中維護一個心跳信息)
b. 還有一個是直接通過Task Attempt(每一個Task Attempt與MRAppMaster之間有專用的協議)
2. 這兩條路徑是獨立的,沒有先后順序之分,假設MRAppMaster直接從ResouceManager獲取Container執行完畢信息。則任務實例直接從Running轉化為SUCCESS_CONTAINER_CLEANUP狀態,假設首先從TaskAttempt中獲知任務完畢信息。則將首先轉化為COMMIT_PENDING狀態。然后再轉化為SUCCESS_CONTAINER_CLEANUP狀態。
當任務運行失敗或者被殺死時,需清理它占用的磁盤空間和產生的結果。當Reduce Task遠程復制一個已經運行完畢的Map Task輸出數據時,可能由於磁盤或者網絡等原因,導致數據損壞或者數據丟失。這是會觸發一個TA_TOO_MANY_FETCH_FAILURE事件。從而觸發MRAppMaster又一次調度運行該Map Task.
六 資源申請和再分配
ContainerAllocator是MRAppMaster中負責資源申請和分配的模塊。用戶提交的作業被分解成Map Task和Reduce Task后,這些Task所需的資源統一由ContainerAllocator模塊負責從ResourceManager中申請,而一旦ContainAllocator得到資源后,需採用一定的策略進一步分配給作業的各個任務。
在YARN中,作業的資源描寫敘述能夠被描寫敘述為五元組:priority,hostname,capabiity,containers,relax_locality分別表示 作業優先級 期望資源所在的host 資源量(當前支持內存與CPU兩種資源) 、Containers數目 是否松弛本地化。比如:
<10,"node1","memeory:1G,CPU:1",3,true)// 優先級是一個正整數,優先級值越小,優先級越高
ContainerAllocator周期性的通過心跳與ResourceManager通信。以獲取已經分配的Contaienr列表,完畢的Container列表、近期更新的節點*+列表等信息,而ContanerAllocator依據這些信息完畢對應的操作。
當用戶提交作業之后,MRAppMaster會為之初始化,並創建一系列的Map Task和TaskReduce Task任務,因為Reduce Task依賴於Map Task之間的結果,所以Reduce Task會延后調度。
任務狀態描寫敘述
Map:
scheduled->assigned->completed
Task:
pending-> scheduled->assigned->completed
pending 表示等待ContainerAllocator發送資源請求的集合
scheduled 標識已經發送了資源申請給RM。但還沒收到分配的資源的任務集合
assignd 已經受到RM分配的資源的任務集合
complet 表示已完畢的任務集合
三種作業狀態:Failed Map Task ,Map Task,Reduce Task分別賦予它們優先級5 20 10也就是說,當三種任務同一時候有資源請求的時候。會優先分配給Failed Map Task,然后是Reduce Task,最后是Map Task.
假設一個任務執行失敗,則會又一次為該任務申請資源
假設一個任務執行速度過慢。則會為其額外申請資源已啟動備份任務(假設啟動了猜測執行過程)
假設一個節點的失敗任務數目過多,則會撤銷對該節點的全部資源的申請請求。
注:在大多數數的情況下,RMAppMaster與RM的心跳信息都是空的。即心跳信息不包括新的資源請求信息,這樣的心跳信息有一下幾個作用:
1. 周期性發送心跳。告訴RM自己還活着
2. 周期性詢問RM,以獲取新分配的資源和各個Container執行狀況。
資源再分配
一旦MRAppMaster收到新分配的Container后,會將這些Container進一步分配給各個任務。Container分配步驟例如以下:
1.推斷新收到的Container包括的資源是否滿足,假設不滿足。則通過下次心跳通知ResourceManager釋放該Container.
2.推斷收到的Container所在的節點是否被增加到黑名單中,假設是。則尋找一個與該Container匹配的任務,並又一次為該任務申請資源。同一時候通過下次心跳通知ResourceManager釋放該Container.
3.依據Container的優先級。將它分配給相應類型的任務。
七 Contianer啟動和釋放
當ContainerAllocator為某個任務申請到資源后,會將執行該任務相關的全部信息封裝到Container中。並要求相應的節點啟動該Container。須要注意的是。Container中執行的任務相應的數據處理引擎與MRv1中全然一致,仍為Map Task和 Reduce Task。正由於如此。MRv1的程序與YARN中的MapReduce程序全然兼容。
ContainerLaunche負責與各個NodeManager通信,已啟動或者釋放Container。在YARN中。執行的Task所需的所有信息被封裝到Container中,包含所需的資源、依賴的外部文件、JAR包、執行時環境變量、執行命令等。ContainerLauncher通過RPC協議ContainerManager與NodeManager通信,以控制Container的啟動和釋放。進而控制任務的執行(比方啟動任務、殺死任務等)。
有多種可能觸發停止/殺死一個Container,常見的有:
1.猜測執行時一個任務執行完畢,需殺死還有一個同樣輸入數據的任務。
3.隨意一個任務執行結束時,YARN會觸發一個殺死任務的命令,以釋放相應的Container占用的資源。
八 猜測運行機制
為了防止運行速度慢的任務拖慢總體的運行進度,使用猜測運行機制,Hadoop會為該任務啟動一個備份任務,讓該備份任務與原始任務同一時候處理同一份數據,誰先運行完。則將誰的結果作為終於結果。
注:1.每一個任務最多僅僅能有一個備份任務實例
2. 啟動備份的時候。必須保證已經有足夠多的Map任務已經完畢,依據這些完畢的任務來估算是否來啟動備份任務。
這樣的算法的長處是可最大化備份任務的有效率。當中有效率指有效備份任務數與全部備份任務數的比值,有效任務是指完畢時間早於原始任務完畢時間的備份任務(即帶來實際收益的備份任務)。備份任務的有效率越高。猜測運行算法越優秀,帶來的收益也就越大。
猜測運行機制實際上採用了經典的算法優化方法,以空間換時間,它同一時候啟動多個同樣的任務處理同樣的數據,並讓這些任務競爭以縮短數據的處理時間。
八 作業恢復
從作業恢復粒度角度來看,當前存在三種不同級別的恢復機制,級別由低到高依次是作業級別、任務級別和記錄級別。當中級別越低實現越簡單,但造成的資源浪費也越嚴重。當前MRAppMaster採用了任務級別的恢復機制,即以任務為基本單位進行恢復,這樣的機制是基於事務型日志完畢作業恢復的,它僅僅關注兩種任務:執行完畢的任務和未完畢的任務。作業執行過程中,MRAppMaster會以日志的形式將作業以及狀態記錄下來,一旦MRAppMaster重新啟動,則可從日志中恢復作業的執行狀態。
當前MRAppMaster的作業恢復機制僅能做到恢復上一次已經執行完畢的任務,對於正在執行的任務。則在前一次MRAppMaster執行實例退出時由ResourceManager強制將其殺死並回收資源。
MRAppMaster採用了開源數據序列化工具Apache Avro記錄這些事件。Avro是一個數據序列化系統,通經常使用於支持大批數據交換和跨語言RPC的應用。
九 MRv1與MRv2簡單對照
MRAppMaster仍採用了MRv1中的數據處理引擎。分別由數據處理引擎MapTask和ReduceTask完畢Map任務和Reduce任務的處理。
MRv1與MRv2的比較
MRv2中在Map端 用Netty取代Jetty. Reduce端採用批拷貝、shuffle和排序插件化
應用程序編程接口 新舊API 新舊API
執行時環境 由JobTracker與TaskTracker組成 YARN (由RM和NM組成)和MRAppMaster
數據處理引擎 MapTask/Reduce Task MapTask/Reduce Task
須要注意的是。YARN並不會改變MapReduce編程模型,它僅僅是應用開發者使用的API。YARN提供了一種新的資源管理模型和實現,用來 執行MapReduce任務。因此。在最簡單的情況下。現有的MapReduce應用仍然能照原樣執行(須要又一次編譯),YARN僅僅只是能讓開發者更精 確地指定運行參數。
十 小結
MapRecuce On YARN的執行時環境由YARN與ApplicationMaster構成,這樣的新穎的執行時環境使得MapReduce能夠與其它計算框架執行在一個集群中,從而達到共享集群資源、提高資源利用率的目的。
隨着YARN的程序與完好,MRv1的獨立執行模式將被MapRedcue On YARN代替。