大數據開發實戰:MapReduce內部原理實踐


    下面結合具體的例子詳述MapReduce的工作原理和過程。

    以統計一個大文件中各個單詞的出現次數為例來講述,假設本文用到輸入文件有以下兩個:

    文件1:

      big data

      offline data

      online data

      offline online data

    文件2

      hello data

      hello online

      hello offline

    目標是統計這兩個文件中各個單詞的出現次數,很容易用肉眼算出各個詞出現的次數:

    big:1

    data:5

    offline:3

    online:3

    hello:3

    但是想象一下,如果是數以百萬級的文獻資料,每個文獻資料數以十萬字或百萬字計,還能用肉眼算嗎?而這正是Hadoop擅長的,對應Hadoop來說只需要定義簡單的Map邏輯和Reduce邏輯,然后把輸入文件和處理邏輯提交

    給Hadoop即可,Hadoop將會自動完成所有的分布式計算任務。

 

   1、MapReduce邏輯開發

      Hadoop開發人員需要定義Map邏輯和Reduce邏輯,下面用偽代碼來描述詞頻統計具體的Map邏輯和Reduce邏輯。

      詞頻統計任務的Map邏輯為:

      

      以上述實例文件1為例,上述Map邏輯執行后,將會輸出:

      big:1

      data:1

      offline:1

      data:1

      online:1

      data:1

      offline:1

      online:1

      data:1

      Hadoop的shuffle過程會把Map任務的輸出組織成<word,{1,1,1,1....}形式的數據並輸入給Reduce任務,然后Reduce任務會對這種形式的數據執行Reduce邏輯,相應的Reduce邏輯為:

      

      至此,所有Map代碼和Reduce代碼都完成了,將此代碼打包並提及給Hadoop執行即可。

     

   2、MapReduce任務提交詳解

    從大數據開發實戰:HDFS和MapReduce優缺點分析的MapReduce架構可以看出,MapReduce作業執行主要由JobTrackerTaskTracker負責完成。

      客戶端編寫好的MapReduce程序並配置好的MapReduce作業是一個Job,Job被提交給JobTracker后,JobTracker會給該Job一個新的ID值,接着檢查該Job指定的輸出目錄是否存在、輸入文件是否存在,

    如果不存在,則拋出錯誤。同時,JobTracker會根據輸入文件計算輸入分片(input split),這些都檢查通過后,JobTracker就會配置Job需要的資源並分配資源,然后JobTracker就會初始化作業,

    也就是將Job放入一個內部的隊列,讓配置好的作業調度器能調度這個作業,作業調度器會初始化這個Job,初始化就是創建一個正在執行的Job對象(封裝任務和記錄信息),以便JobTracker  跟蹤Job的狀態和進程。

      該Job被作業調度器調度時,作業調度器會獲取輸入分片信息,每個分片創建一個Map任務,並根據TaskTracker的忙閑情況和空閑資源等分配Map任務和Reduce任務到TaskTracker,同時通過心跳機制也可以監控到TaskTracker

    的狀態和進度,也能計算出整個Job的狀態和進度。當JobTracker獲得最后一個完成指定任務的TaskTracker操作成功通知的時候,JobTracker會把整個Job狀態置為成功,然后當查詢Job運行狀態時(注意:這是個異步操作),客戶端

    會查到Job完成的通知。如果job中途失敗,MapReduce也會有相應的機制處理。一般而言,如果不是程序員程序本身有bug,MapReduce錯誤處理機制都能保證提交的Job能正常完成。

 

   3、MapReduce內部執行原理詳解

    那么,MapReduce到底是如何運行的呢? 按照時間順序,MapReduce任務執行包括:輸入分片Map、Shuffle和Reduce等階段,一個階段的輸出正好是下一個階段的輸入,上述各個階段的關系和流程如下:

    

 

 

    下面結合上文的實例問更加深入和詳細地介紹上述過程,如下圖:

 

 

    

 

  4、各環節介紹

   4.1、輸入分片

      在進行Map計算之前,MapReduce會根據輸入文件計算輸入分片。每個輸入分片對應一個Map任務,輸入分片存儲的並非數據本身,而是一個分片長度和一個記錄數據的位置的數組。輸入分片往往和HDFS和block(塊)

    關系密切,假如設定的HDFS的塊的大小是64MB,如果輸入只有一個150MB,那么MapReduce會把此大文件切分為三片(分別為:64MB、64MB和22MB),同樣,如果輸入為兩個文件,其大小分別是22MB和100MB,那么

    MapReduce會把20MB文件作為一個輸入分片,100MB則切分為兩個即64MB和36MB的輸入分片。對於上述實例文件1和文件2,由於非常小,因此分別被作為split1和split2輸入Map任務1和2中(此處只為說明問題,實際處理

    中應該將小文件進行合並,否則如果輸入多個文件而且文件大小均遠小於塊大小,會導致生成多個不必要的Map任務,這也是MapReduce優化計算的一個關鍵點)。

  4.2、Map階段

    在Map階段,各個Map任務會接收到所分配的split,並調用Map函數,逐行執行並輸出鍵值。比如對於上面的例子,map task1 將會接收到input split1,並調用Map函數,其輸出如下的鍵值對:

    big 1, data 1, offline  1, data 1, online 1, data 1, offline 1, online 1, data 1

  4.3、Combiner 階段

      Combiner 階段是可選的的,Combiner其實也是一種Reduce操作,但它是一個本地化的Reduce操作,是Map運算的本地后續操作,主要是在Map計算出中間文件前做的一個簡單的合並重復鍵值的操作,

    例如上述文件1中data出現了4次,Map計算時如果碰到一個data的單詞就會記錄1,這樣就重復了4次,Map任務輸出就有冗余,這樣后續處理和網絡傳輸都被消耗不必要的資源,一次通過Combiner操作可以解決和

    優化次問題。但這一操作是有風險的,使用它的原則是Combiner的輸出不會影響到Reduce 計算的的最終輸入,例如,如果計算只是求總數、最大值及最小值,可以用Combiner操作,但是如果做平均值計算使用Combiner,

    最終Reduce計算結果就會出錯。

  4.4、Shuffle階段

      Map任務的輸出必須經過一個名叫Shuffle的階段才能交給Reduce處理。Shuffle階段是MapReduce的核心,也是奇跡發生的地方,同時Shuffle階段的性能直接影響整個MapReduce的性能。

      那什么是Shuffle呢?一般理解為數據從Map Task輸出到Reduce Task輸入的過程,它決定了Map Task的輸出如何且高效第傳輸給Reduce Task。

      總的來說,Shuffle階段包含在Map和Reduce兩個階段中,在Map階段的Shuffle階段是對Map的結果進行分區(partition)、排序(sort)和分隔(spill),然后將同一分區的輸出合並在一起(merge)並寫在磁盤上,同時按照不同的

    分區划分發送給對應的Reduce(Map輸出的划分和Reduce任務的對應關系由JobTracker確定)的整個過程;Reduce階段的Shuffle又會將各個Map輸出的同一個分區划分的輸出進行合並,然后對合並的結果進行排序,最后交給

    Reduce處理的整個過程。

    下面從Map和Reduce兩端詳細介紹Shuffle階段。

    4.4.1、Map階段Shuffle

      通常MapReduce計算的都是海量數據,而且Map輸出還需要對結果進行排序,內存開銷很大,因此完全在內存中完成是不可能的也是不現實的,所以Map輸出時會在內存里開啟一個環形內存緩存區,並且在配置文件里為

    這個緩存區設置了一個閥值(默認是80%,可以自定義修改此配置)。同時,Map還為輸出操作啟動了一個守護線程,如果緩存區的內存使用達到了閥值,那么這個守護線程就會把80%的內存區內容寫到磁盤上,這個過程叫分隔

    (spill),另外的20%內存可以供Map輸出繼續使用,寫入磁盤和寫入內存操作是互不干擾的,如果緩存區被撐滿了,那么Map就會阻塞寫入內存的操作,待寫入磁盤操作完成后再繼續執行寫入內存操作。

      緩存區內容分隔到磁盤前,會首先進行分區操作,分區的數目由Reduce的數目決定。對應本例,Reduce的數目為2個,那么分區數就是2個,然后對每個分區,后台線程還會按照鍵值對需要寫出的數據進行排序,如果配置了

    Combiner函數,還會進行Combiner操作,以使得更少地數據被寫入磁盤並發送給Reducer。

      每次的分隔操作都會生成一個分隔文件,全部的Map輸出完成后,可能會有很多的分隔文件,因此在map 任務結束前,還要進行合並操作,即將這些分隔文件按照分區合並為單獨的文件。在合並過程中,同樣也會進行排序,

    如果定義了Combiner,也會進行Combiner操作。

      至此,Map階段的所有工作都已經結束,最終生成的文件也會存放在TaskTracker能訪問的某個本地目錄內。每個Reduce Task不斷地從JobTracker那里獲取Map Task是否完成的信息,如果Reduce task得到通知,獲知某台

    TaskTracker上的Map Task執行完成,Shuffle的后半段過程,也就是Reduce階段的Shuffle,便開始啟動。

   4.4.2、Reduce階段Shuffle

    Shuffle 在Reduce階段可以分為三個階段:Copy Map輸出、Merge階段和Reduce處理。

    1、Copy Map輸出:

      如上文所述,Map任務完成后,會通知TaskTracker狀態已完成,TaskTracker進而通知JobTracker(這些通知一般通過心跳機制完成)。對Job來說,JobTracker記錄了Map輸出和TaskTracker的映射關系,同時

    Reduce也會定期向JobTracker獲取Map的輸出與否以及輸位置,一旦拿到輸出位置Reduce就會啟動Copy線程,通過HTTP方式請求Mask Task所在的TaskTracker獲取其輸出文件。因為Map Task早已結束,這些文件就被TaskTracker

    存儲在Map Task所在的本地磁盤中。

    2、Merge階段:

      此處的合並和Map階段的合並類似,復制過來的數據會首先放入內存緩存區中,這里的內存緩存區比Map階段的要靈活很多,它基於JVM的heap size設置,因為Shuffle階段Reduce task並不運行,因此大部分內存

    應該給Shuffle使用;同時此Shuffle的合並階段根據要處理的數據量的不同,也可能會有分隔到磁盤的過程,如果設置了Combiner函數,Combiner操作也會執行。

      從Map階段的Shuffle過程到Reduce階段的Shuffle過程,都提到了合並,那么合並究竟是怎樣的呢?如上面的例子,Map Task1對於offline的鍵值是2,而Map Task2的offline鍵值是1,那么合並就是將offline的鍵值合並為group,

    本例即為:<offline,{2,1}>。

    3、Reduce Task的輸入:

      不到合並后,最后會生成一個最終結果(可能在內存,也可能在磁盤),至此,Reduce Task的輸入准備完畢,下一步就是真正的Reduce操作。

  

  4.5、Reduce階段

    經過Map和Reduce階段的Shuffle過程后,Reduce任務的輸入的准備完畢,相關的數據已經被合並和匯總,Reduce任務只需要調用Reduce函數即可,對於本例即對每個鍵,調用sum邏輯合並value並輸出到HDFS即可,比如對於

    Reduce Task1的offline的鍵,只需要將集合{2,1}相加,輸出offline 3即可。

 

  至此,整個MapReduce的詳細流程和原理介紹完畢,從上述過程中,Shuffle是整個流程中最為核心的部分,也是最復雜的部分。

 

  參考資料:《離線和實時大數據開發實戰》


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM