關注公眾號,大家可以在公眾號后台回復“博客園”,免費獲得作者 Java 知識體系/面試必看資料。
前言
前面我們講了 MapReduce 的編程模型,我們知道他主要分成兩大階段來完成一項任務,一是 map 階段對我們的數據進行分開計算,第二是 reduce 階段,對 map 階段計算產生的結果再進行匯總。
還寫了一個非常經典的,類似於Java 中 HelloWorld 一樣的 WordCount 代碼。今天我們就根據這個代碼來闡述整個 MapReduce 的運行過程。
先苦口婆心的告訴你,這個知識點是非常非常非常之重要,之前面的 5 家公司,有 3 家公司都問了這個過程,另外兩家問了 Yarn 的運行機制,這是后面會去講的內容,你必須得懂大體的流程是怎么樣子,如果能去研究搞清楚每個細節,那當然最好的。
從數據進如到處理程序到處理完成后輸出到存儲中,整個過程我們大體分為如下 5 個階段:
-
Input Split 或 Read 數據階段
Input Split,是從數據分片出發,把數據輸入到處理程序中。Read 則是從處理程序出發反向來看,把數據從文件中讀取到處理程序中來。這個階段表達的是我們數據從哪里來。這是整個過程的開始。
-
Map階段
當數據輸入進來以后,我們進行的是 map 階段的處理。例如對一行的單詞進行分割,然后每個單詞進行計數為 1 進行輸出。
-
Shuffle 階段
Shuffle 階段是整個 MapReduce 的核心,介於 Map 階段跟 Reduce 階段之間。在 Spark 中也有這個概念,可以說你理解了這個概念,到時候再學習其他的大數據計算框架原理的時候,會給你帶來非常大的幫助,因為他們大多理念是相同的,下面會重點講解這個過程。
-
Reduce 階段
數據經過 Map 階段處理,數據再經過 Shuffle 階段,最后到 Reduce ,相同的 key 值的數據會到同一個 Reduce 任務中進行最后的匯總。
-
Output 階段
這個階段的事情就是將 Reduce 階段計算好的結果,存儲到某個地方去,這是整個過程的結束。
整個執行流程圖
一圖勝千言:

如果看不清晰,我上傳了一份完整的在 gayHub 上面,地址:
(https://raw.githubusercontent.com/heyxyw/bigdata/master/bigdatastudy/doc/img/mapreduce/mr-Implementation-process.png)
當然了,不太了解或者剛接觸可能一開始看比較懵逼,我剛開始也是。下面我們就一塊一塊的來拆分講解,最后差不多就明白了。
Input Split 數據階段
Input Split 顧明思議,輸入分片 ,為什么我們會叫 輸入分片呢?因為數據在進行 Map 計算之前,MapReduce 會根據輸入文件進行切分,因為我們需要分布式的進行計算嘛,那么我得計算出來我的數據要切成多少片,然后才好去對每片數據分配任務去處理。
每個輸入分片會對應一個 Map 任務,輸入分片存儲的並非數據本身,而是一個分片長度和一個記錄數據的位置數據,它往往是和 HDFS 的 block(塊) 進行關聯的。
假如我們設定每個 HDFS 的塊大小是 128M,如果我們現在有3個文件,大小分別是 10M,129M,200M,那么MapReduce 對把 10M 的文件分為一個分片,129M 的數據文件分為2個分片,200M 的文件也是分為兩個分片。那么此時我們就有 5 個分片,就需要5個 Map 任務去處理,而且數據還是不均勻的。
如果有非常多的小文件,那么就會產生大量的 Map 任務,處理效率是非常低下的。
這個階段使用的是 InputFormat 組件,它是一個接口 ,默認使用的是 TextInputFormat 去處理,他會調用 readRecord() 去讀取數據。
這也是MapReduce 計算優化的一個非常重要的一個點,**面試被考過**。如何去優化這個小文件的問題呢?
-
最好的辦法:在數據處理系統的最前端(預處理、采集),就將小文件先進行合並了,再傳到 HDFS 中去。
-
補救措施:如果已經存在大量的小文件在HDFS中了,可以使用另一種 InputFormat 組件CombineFileInputFormat 去解決,它的切片方式跟 TextInputFormat 不同,它會將多個小文件從邏輯上規划到一個切片中,這樣,多個小文件就可以交給一個 Map 任務去處理了。
Map階段
將 Map 階段的輸出作為 Reduce 階段的輸入的過程就是 Shuffle 。 這也是整個 MapReduce 中最重要的一個環節。
一般MapReduce 處理的都是海量數據,Map 輸出的數據不可能把所有的數據都放在內存中,當我們在map 函數中調用 context.write() 方法的時候,就會調用 OutputCollector 組件把數據寫入到處於內存中的一個叫環形緩沖區的東西。
環形緩沖區默認大小是 100M ,但是只寫80%,同時map還會為輸出操作啟動一個守護線程,當到數據達到80%的時候,守護線程開始清理數據,把數據寫到磁盤上,這個過程叫 spill 。
數據在寫入環形緩沖區的時候,數據會默認根據key 進行排序,每個分區的數據是有順序的,默認是 HashPartitioner。當然了,我們也可以去自定義這個分區器。
每次執行清理都產生一個文件,當 map 執行完成以后,還會有一個合並文件文件的過程,其實他這里跟 Map 階段的輸入分片(Input split)比較相似,一個 Partitioner 對應一個 Reduce 作業,如果只有一個 reduce 操作,那么 Partitioner 就只有一個,如果有多個 reduce 操作,那么 Partitioner 就有多個。Partitioner 的數量是根據 key 的值和 Reduce 的數量來決定的。可以通過 job.setNumReduceTasks() 來設置。
這里還有一個可選的組件 Combiner ,溢出數據的時候如果調用 Combiner 組件,它的邏輯跟 reduce 一樣,相同的key 先把 value 進行相加,前提是合並並不會改變業務,這樣就不糊一下傳輸很多相同的key 的數據,從而提升效率。
舉個例子,在溢出數據的時候,默認不使用 Combiner,數據是長這樣子: <a,1>,<a,2>,<c,4>。 當使用 Combiner 組件時,數據則是: <a,3>,<c,4> 。把 a 的數據進行了合並。
Reduce 階段
在執行 Reduce 之前,Reduce 任務會去把自己負責分區的數據拉取到本地,還會進行一次歸並排序並進行合並。
Reduce 階段中的 reduce 方法,也是我們自己實現的邏輯,跟Map 階段的 map 方法一樣,只是在執行 reduce 函數的時候,values 為 同一組 key 的value 迭代器。在 wordCount 的例子中,我們迭代這些數據進行疊加。最后調用 context.write 函數,把單詞和總數進行輸出。
Output 階段
在 reduce 函數中調用 context.write 函數時,會調用 OutPutFomart 組件,默認實現是 TextOutPutFormat ,把數據輸出到目標存儲中,一般是 HDFS。
擴展
上面我們只是講解了大體的流程,這里給大家拋幾個問題?也是面試中經常被問到的。
1. 文件切分是怎么切的?一個文件到底會切成幾分?算法是怎么樣的?
2. Map 任務的個數是怎么確定的?
上面的問題,給大家貼兩個鏈接:
MapReduce Input Split(輸入分/切片)詳解:
https://blog.csdn.net/dr_guo/article/details/51150278
源碼解析 MapReduce作業切片(Split)過程:
https://blog.csdn.net/u010010428/article/details/51469994
總結
MapReduce 的執行流程到這里就大致講解完成了,希望你也能畫出來上面的大圖。能夠理解到大體的流程,並能掌握關鍵的環節 Shuffle 。以后你還會在其他的大數據組件上聽到這個詞。
后面將給大家帶來 Yarn 的大致運行機制,然后再為大家講解 WordCount 運行的整個過程。
敬請期待。
Java 極客技術公眾號,是由一群熱愛 Java 開發的技術人組建成立,專注分享原創、高質量的 Java 文章。如果您覺得我們的文章還不錯,請幫忙贊賞、在看、轉發支持,鼓勵我們分享出更好的文章。

