下面結合具體的例子詳述MapReduce的工作原理和過程。
以統計一個大文件中各個單詞的出現次數為例來講述,假設本文用到輸入文件有以下兩個:
文件1:
big data
offline data
online data
offline online data
文件2
hello data
hello online
hello offline
目標是統計這兩個文件中各個單詞的出現次數,很容易用肉眼算出各個詞出現的次數:
big:1
data:5
offline:3
online:3
hello:3
但是想象一下,如果是數以百萬級的文獻資料,每個文獻資料數以十萬字或百萬字計,還能用肉眼算嗎?而這正是Hadoop擅長的,對應Hadoop來說只需要定義簡單的Map邏輯和Reduce邏輯,然后把輸入文件和處理邏輯提交
給Hadoop即可,Hadoop將會自動完成所有的分布式計算任務。
1、MapReduce邏輯開發
Hadoop開發人員需要定義Map邏輯和Reduce邏輯,下面用偽代碼來描述詞頻統計具體的Map邏輯和Reduce邏輯。
詞頻統計任務的Map邏輯為:
以上述實例文件1為例,上述Map邏輯執行后,將會輸出:
big:1
data:1
offline:1
data:1
online:1
data:1
offline:1
online:1
data:1
Hadoop的shuffle過程會把Map任務的輸出組織成<word,{1,1,1,1....}形式的數據並輸入給Reduce任務,然后Reduce任務會對這種形式的數據執行Reduce邏輯,相應的Reduce邏輯為:
至此,所有Map代碼和Reduce代碼都完成了,將此代碼打包並提及給Hadoop執行即可。
2、MapReduce任務提交詳解
從大數據開發實戰:HDFS和MapReduce優缺點分析的MapReduce架構可以看出,MapReduce作業執行主要由JobTrackerTaskTracker負責完成。
客戶端編寫好的MapReduce程序並配置好的MapReduce作業是一個Job,Job被提交給JobTracker后,JobTracker會給該Job一個新的ID值,接着檢查該Job指定的輸出目錄是否存在、輸入文件是否存在,
如果不存在,則拋出錯誤。同時,JobTracker會根據輸入文件計算輸入分片(input split),這些都檢查通過后,JobTracker就會配置Job需要的資源並分配資源,然后JobTracker就會初始化作業,
也就是將Job放入一個內部的隊列,讓配置好的作業調度器能調度這個作業,作業調度器會初始化這個Job,初始化就是創建一個正在執行的Job對象(封裝任務和記錄信息),以便JobTracker 跟蹤Job的狀態和進程。
該Job被作業調度器調度時,作業調度器會獲取輸入分片信息,每個分片創建一個Map任務,並根據TaskTracker的忙閑情況和空閑資源等分配Map任務和Reduce任務到TaskTracker,同時通過心跳機制也可以監控到TaskTracker
的狀態和進度,也能計算出整個Job的狀態和進度。當JobTracker獲得最后一個完成指定任務的TaskTracker操作成功通知的時候,JobTracker會把整個Job狀態置為成功,然后當查詢Job運行狀態時(注意:這是個異步操作),客戶端
會查到Job完成的通知。如果job中途失敗,MapReduce也會有相應的機制處理。一般而言,如果不是程序員程序本身有bug,MapReduce錯誤處理機制都能保證提交的Job能正常完成。
3、MapReduce內部執行原理詳解
那么,MapReduce到底是如何運行的呢? 按照時間順序,MapReduce任務執行包括:輸入分片Map、Shuffle和Reduce等階段,一個階段的輸出正好是下一個階段的輸入,上述各個階段的關系和流程如下:
下面結合上文的實例問更加深入和詳細地介紹上述過程,如下圖:
4、各環節介紹
4.1、輸入分片
在進行Map計算之前,MapReduce會根據輸入文件計算輸入分片。每個輸入分片對應一個Map任務,輸入分片存儲的並非數據本身,而是一個分片長度和一個記錄數據的位置的數組。輸入分片往往和HDFS和block(塊)
關系密切,假如設定的HDFS的塊的大小是64MB,如果輸入只有一個150MB,那么MapReduce會把此大文件切分為三片(分別為:64MB、64MB和22MB),同樣,如果輸入為兩個文件,其大小分別是22MB和100MB,那么
MapReduce會把20MB文件作為一個輸入分片,100MB則切分為兩個即64MB和36MB的輸入分片。對於上述實例文件1和文件2,由於非常小,因此分別被作為split1和split2輸入Map任務1和2中(此處只為說明問題,實際處理
中應該將小文件進行合並,否則如果輸入多個文件而且文件大小均遠小於塊大小,會導致生成多個不必要的Map任務,這也是MapReduce優化計算的一個關鍵點)。
4.2、Map階段
在Map階段,各個Map任務會接收到所分配的split,並調用Map函數,逐行執行並輸出鍵值。比如對於上面的例子,map task1 將會接收到input split1,並調用Map函數,其輸出如下的鍵值對:
big 1, data 1, offline 1, data 1, online 1, data 1, offline 1, online 1, data 1
4.3、Combiner 階段
Combiner 階段是可選的的,Combiner其實也是一種Reduce操作,但它是一個本地化的Reduce操作,是Map運算的本地后續操作,主要是在Map計算出中間文件前做的一個簡單的合並重復鍵值的操作,
例如上述文件1中data出現了4次,Map計算時如果碰到一個data的單詞就會記錄1,這樣就重復了4次,Map任務輸出就有冗余,這樣后續處理和網絡傳輸都被消耗不必要的資源,一次通過Combiner操作可以解決和
優化次問題。但這一操作是有風險的,使用它的原則是Combiner的輸出不會影響到Reduce 計算的的最終輸入,例如,如果計算只是求總數、最大值及最小值,可以用Combiner操作,但是如果做平均值計算使用Combiner,
最終Reduce計算結果就會出錯。
4.4、Shuffle階段
Map任務的輸出必須經過一個名叫Shuffle的階段才能交給Reduce處理。Shuffle階段是MapReduce的核心,也是奇跡發生的地方,同時Shuffle階段的性能直接影響整個MapReduce的性能。
那什么是Shuffle呢?一般理解為數據從Map Task輸出到Reduce Task輸入的過程,它決定了Map Task的輸出如何且高效第傳輸給Reduce Task。
總的來說,Shuffle階段包含在Map和Reduce兩個階段中,在Map階段的Shuffle階段是對Map的結果進行分區(partition)、排序(sort)和分隔(spill),然后將同一分區的輸出合並在一起(merge)並寫在磁盤上,同時按照不同的
分區划分發送給對應的Reduce(Map輸出的划分和Reduce任務的對應關系由JobTracker確定)的整個過程;Reduce階段的Shuffle又會將各個Map輸出的同一個分區划分的輸出進行合並,然后對合並的結果進行排序,最后交給
Reduce處理的整個過程。
下面從Map和Reduce兩端詳細介紹Shuffle階段。
4.4.1、Map階段Shuffle
通常MapReduce計算的都是海量數據,而且Map輸出還需要對結果進行排序,內存開銷很大,因此完全在內存中完成是不可能的也是不現實的,所以Map輸出時會在內存里開啟一個環形內存緩存區,並且在配置文件里為
這個緩存區設置了一個閥值(默認是80%,可以自定義修改此配置)。同時,Map還為輸出操作啟動了一個守護線程,如果緩存區的內存使用達到了閥值,那么這個守護線程就會把80%的內存區內容寫到磁盤上,這個過程叫分隔
(spill),另外的20%內存可以供Map輸出繼續使用,寫入磁盤和寫入內存操作是互不干擾的,如果緩存區被撐滿了,那么Map就會阻塞寫入內存的操作,待寫入磁盤操作完成后再繼續執行寫入內存操作。
緩存區內容分隔到磁盤前,會首先進行分區操作,分區的數目由Reduce的數目決定。對應本例,Reduce的數目為2個,那么分區數就是2個,然后對每個分區,后台線程還會按照鍵值對需要寫出的數據進行排序,如果配置了
Combiner函數,還會進行Combiner操作,以使得更少地數據被寫入磁盤並發送給Reducer。
每次的分隔操作都會生成一個分隔文件,全部的Map輸出完成后,可能會有很多的分隔文件,因此在map 任務結束前,還要進行合並操作,即將這些分隔文件按照分區合並為單獨的文件。在合並過程中,同樣也會進行排序,
如果定義了Combiner,也會進行Combiner操作。
至此,Map階段的所有工作都已經結束,最終生成的文件也會存放在TaskTracker能訪問的某個本地目錄內。每個Reduce Task不斷地從JobTracker那里獲取Map Task是否完成的信息,如果Reduce task得到通知,獲知某台
TaskTracker上的Map Task執行完成,Shuffle的后半段過程,也就是Reduce階段的Shuffle,便開始啟動。
4.4.2、Reduce階段Shuffle
Shuffle 在Reduce階段可以分為三個階段:Copy Map輸出、Merge階段和Reduce處理。
1、Copy Map輸出:
如上文所述,Map任務完成后,會通知TaskTracker狀態已完成,TaskTracker進而通知JobTracker(這些通知一般通過心跳機制完成)。對Job來說,JobTracker記錄了Map輸出和TaskTracker的映射關系,同時
Reduce也會定期向JobTracker獲取Map的輸出與否以及輸位置,一旦拿到輸出位置Reduce就會啟動Copy線程,通過HTTP方式請求Mask Task所在的TaskTracker獲取其輸出文件。因為Map Task早已結束,這些文件就被TaskTracker
存儲在Map Task所在的本地磁盤中。
2、Merge階段:
此處的合並和Map階段的合並類似,復制過來的數據會首先放入內存緩存區中,這里的內存緩存區比Map階段的要靈活很多,它基於JVM的heap size設置,因為Shuffle階段Reduce task並不運行,因此大部分內存
應該給Shuffle使用;同時此Shuffle的合並階段根據要處理的數據量的不同,也可能會有分隔到磁盤的過程,如果設置了Combiner函數,Combiner操作也會執行。
從Map階段的Shuffle過程到Reduce階段的Shuffle過程,都提到了合並,那么合並究竟是怎樣的呢?如上面的例子,Map Task1對於offline的鍵值是2,而Map Task2的offline鍵值是1,那么合並就是將offline的鍵值合並為group,
本例即為:<offline,{2,1}>。
3、Reduce Task的輸入:
不到合並后,最后會生成一個最終結果(可能在內存,也可能在磁盤),至此,Reduce Task的輸入准備完畢,下一步就是真正的Reduce操作。
4.5、Reduce階段
經過Map和Reduce階段的Shuffle過程后,Reduce任務的輸入的准備完畢,相關的數據已經被合並和匯總,Reduce任務只需要調用Reduce函數即可,對於本例即對每個鍵,調用sum邏輯合並value並輸出到HDFS即可,比如對於
Reduce Task1的offline的鍵,只需要將集合{2,1}相加,輸出offline 3即可。
至此,整個MapReduce的詳細流程和原理介紹完畢,從上述過程中,Shuffle是整個流程中最為核心的部分,也是最復雜的部分。
參考資料:《離線和實時大數據開發實戰》