目錄
Hadoop 中的 MapReduce是一個使用簡單的軟件框架,基於它寫出來的應用程序能夠運行在由上千個機器組成的大型集群上,並以一種可靠容錯並行處理TB級別的數據集
Hadoop MapReduce 源於 Google 在2004年12月份發表的 MapReduce 論文。Hadoop MapReduce 其實就是 Google MapReduce 的一個克隆版本
MapReduce 為什么如此受歡迎?尤其現在互聯網+時代,互聯網+公司都在使用 MapReduce。MapReduce 之所以如此受歡迎,它主要有以下幾個特點:
1、MapReduce 易於編程
它簡單的實現一些接口,就可以完成一個分布式程序,這個分布式程序可以分布到大量廉價的 PC 機器運行。也就是說你寫一個分布式程序,跟寫一個簡單的串行程序是一模一樣的。 就是因為這個特點使得 MapReduce 編程變得非常流行。
2、良好的擴展性
當你的計算資源不能得到滿足的時候,你可以通過簡單的增加機器來擴展它的計算能力。
3、高容錯性
MapReduce 設計的初衷就是使程序能夠部署在廉價的 PC 機器上,這就要求它具有很高的容錯性。比如其中一台機器掛了,它可以把上面的計算任務轉移到另外一個節點上面上運行,不至於這個任務運行失敗,而且這個過程不需要人工參與,而完全是由 Hadoop 內部完成的。
4、適合 PB 級以上海量數據的離線處理
它適合離線處理而不適合在線處理。比如像毫秒級別的返回一個結果,MapReduce 很難做到
MapReduce 雖然具有很多的優勢,但是它也有不擅長的地方。這里的不擅長不代表它不能做,而是在有些場景下實現的效果差,並不適合 MapReduce 來處理,主要表現在以下幾個方面:
1、實時計算
MapReduce 無法像 Mysql 一樣,在毫秒或者秒級內返回結果。
2、流式計算
流式計算的輸入數據時動態的,而 MapReduce 的輸入數據集是靜態的,不能動態變化。這是因為 MapReduce 自身的設計特點決定了數據源必須是靜態的。
3、DAG(有向圖)計算
多個應用程序存在依賴關系,后一個應用程序的輸入為前一個的輸出。在這種情況下,MapReduce 並不是不能做,而是使用后,每個MapReduce 作業的輸出結果都會寫入到磁盤,會造成大量的磁盤IO,導致性能非常的低下
為了分析 MapReduce 的編程模型,這里我們以 WordCount 為實例。就像 Java、C++等編程語言的入門程序 hello word 一樣,WordCount 是 MapReduce 最簡單的入門程序。
下面我們就來逐步分析
1、場景:假如有大量的文件,里面存儲的都是單詞。
類似應用場景:WordCount 雖然很簡單,但它是很多重要應用的模型。
1) 搜索引擎中,統計最流行的K個搜索詞。
2) 統計搜索詞頻率,幫助優化搜索詞提示。
2、任務:我們該如何統計每個單詞出現的次數?
3、將問題規范為:有一批文件(規模為 TB 級或者 PB 級),如何統計這些文件中所有單詞出現的次數。
4、解決方案:首先,分別統計每個文件中單詞出現的次數;然后,累加不同文件中同一個單詞出現次數
通過上面的分析可知,它其實就是一個典型的 MapReduce 過程。下面我們通過示意圖來分析 MapReduce 過程

上圖的流程大概分為以下幾步:
1、假設一個文件有三行英文單詞作為 MapReduce 的Input(輸入),這里經過 Splitting過程把文件分割為3塊。分割后的3塊數據就可以並行處理,每一塊交給一個 map 線程處理。
2、每個map線程中,以每個單詞為key,以1作為詞頻數value,然后輸出。
3、每個map的輸出要經過shuffling(混洗),將相同的單詞key放在一個桶里面,然后交給reduce處理。
4、reduce接受到shuffle后的數據,會將相同的單詞進行合並,得到每個單詞的詞頻數,最后將統計好的每個單詞的詞頻數作為輸出結果。
上述就是 MapReduce 的大致流程,前兩步可以看做map階段,后兩步可以看做reduce 階段。下面我們來看看MapReduce大致實現
1、Input:首先MapReduce輸入的是一系列key/value對。key表示每行偏移量,value代表每行輸入的單詞。
2、用戶提供了map函數和reduce函數的實現
map(k,v) ——> list(k1,v1)
reduce(k1,list(v1)) ——>(k2,v2)
map函數將每個單詞轉化為key/value對輸出,這里key為每個單詞,value為詞頻1。(k1,v1)是map輸出的中間key/value結果對。reduce將相同單詞的所有詞頻進行合並,比如將單詞k1,詞頻為list(v1),合並為(k2,v2)。reduce 合並完之后,最終輸出一系列(k2,v2)鍵值對
下面我們來看一下 MapReduce 的偽代碼
map(key,value):// map 函數,key代表偏移量,value代表每行單詞
for each word w in value:// 循環每行數據,輸出每個單詞和詞頻的鍵值對(w,1)
emit(w,1)
reduce(key,values):// reduce 函數,key代表一個單詞,value代表這個單詞的所有詞頻數集合
result=0
for each count v in values: // 循環詞頻集合,求出該單詞的總詞頻數,然后輸出(key,result)
result+=v
emit(key,result)
講到這里,我們可以對 MapReduce 做一個總結:MapReduce將作業的整個運行過程分為兩個階段:Map階段和Reduce階段。
1、Map階段
Map 階段是由一定數量的Map Task組成。這些Map Task可以同時運行,每個Map Task又是由以下三個部分組成。
1) 對輸入數據進行解析的組件:InputFormat
因為不同的數據可能存儲的數據格式不一樣,這就需要有一個InputFormat組件來解析這些數據。默認情況下,它提供了一個TextInputFormat來解釋數據。TextInputFormat 就是我們前面提到的文本文件輸入格式,它會將文件的每一行解釋成(key,value),key代表每行偏移量,value代表每行數據內容。通常情況我們不需要自定義 InputFormat,因為 MapReduce提供了很多種InputFormat的實現,我們根據不同的數據格式,選擇不同的 InputFormat 來解釋就可以了。
2)輸入數據處理:Mapper
這個 Mapper 是必須要實現的,因為根據不同的業務對數據有不同的處理。
3)數據分組:Partitioner
Mapper 數據處理之后輸出之前,輸出key會經過Partitioner分組或者分桶選擇不同的reduce。默認的情況下,Partitioner會對map輸出的key進行hash取模,比如有6個Reduce Task,它就是模(mod)6,如果key的hash值為0,就選擇第0個 Reduce Task,如果key的hash值為1,就選擇第一個 Reduce Task。這樣不同的map對相同單詞key,它的hash值取模是一樣的,所以會交給同一個reduce來處理。
2、Reduce 階段
Reduce階段由一定數量的Reduce Task組成。這些Reduce Task可以同時運行,每個 Reduce Task又是由以下四個部分組成。
1) 數據運程拷貝
Reduce Task 要運程拷貝每個map處理的結果,從每個map中讀取一部分結果。每個 Reduce Task拷貝哪些數據,是由上面 Partitioner 決定的。
2) 數據按照key排序
Reduce Task讀取完數據后,要按照key進行排序。按照key排序后,相同的key被分到一組,交給同一個Reduce Task處理。
3) 數據處理:Reducer
以WordCount為例,相同的單詞key分到一組,交個同一個Reducer處理,這樣就實現了對每個單詞的詞頻統計。
4) 數據輸出格式:OutputFormat
Reducer統計的結果,將按照OutputFormat格式輸出。默認情況下的輸出格式為 TextOutputFormat,以WordCount為例,這里的key為單詞,value為詞頻數。
InputFormat、Mapper、Partitioner、Reducer和OutputFormat 都是用戶可以實現的。通常情況下,用戶只需要實現 Mapper和Reducer,其他的使用默認實現就可以了。
下面我們通過MapReduce的內部邏輯,來分析MapReduce的數據處理過程。我們以WordCount為例,來看一下mapreduce 內部邏輯,如下圖所示

MapReduce 內部邏輯的大致流程主要由以下幾步完成
1、首先將 HDFS 中的數據以 Split 方式作為 MapReduce 的輸入。以前文章提到,HDFS中的數據是以 block存儲,這里怎么又變成了以Split 作為輸入呢?其實 block 是 HDFS 中的術語,Split 是 MapReduce 中的術語。默認的情況下,一個 Split 可以對應一個 block,當然也可以對應多個block,它們之間的對應關系是由 InputFormat 決定的。默認情況下,使用的是 TextInputFormat,這時一個Split對應一個block。 假設這里有4個block,也就是4個Split,分別為Split0、Split1、Split2和Split3。這時通過 InputFormat 來讀每個Split里面的數據,它會把數據解析成一個個的(key,value),然后交給已經編寫好的Mapper 函數來處理。
2、每個Mapper 將輸入(key,value)數據解析成一個個的單詞和詞頻,比如(a,1)、(b,1)和(c,1)等等。
3、Mapper解析出的數據,比如(a,1),經過 Partitioner之后,會知道該選擇哪個Reducer來處理。每個 map 階段后,數據會輸出到本地磁盤上。
4、在reduce階段,每個reduce要進行shuffle讀取它所對應的數據。當所有數據讀取完之后,要經過Sort全排序,排序之后再交給 Reducer 做統計處理。比如,第一個Reducer讀取了兩個的(a,1)鍵值對數據,然后進行統計得出結果(a,2)。
5、將 Reducer 的處理結果,以OutputFormat數據格式輸出到 HDFS 的各個文件路徑下。這里的OutputFormat默認為TextOutputFormat,key為單詞,value為詞頻數,key和value之間的分割符為"\tab"。 由上圖所示,(a 2)輸出到Part-0,(b 3)輸出到Part-1,(c 3)輸出到Part-2
和HDFS一樣,MapReduce也是采用Master/Slave的架構,其架構圖如下所示

MapReduce包含四個組成部分,分別為Client、JobTracker、TaskTracker和Task,下面我們詳細介紹這四個組成部分。
1)Client 客戶端
每一個 Job 都會在用戶端通過 Client 類將應用程序以及配置參數 Configuration 打包成 JAR 文件存儲在 HDFS,並把路徑提交到 JobTracker 的 master 服務,然后由 master 創建每一個 Task(即 MapTask 和 ReduceTask) 將它們分發到各個 TaskTracker 服務中去執行。
2)JobTracker
JobTracke負責資源監控和作業調度。JobTracker 監控所有TaskTracker 與job的健康狀況,一旦發現失敗,就將相應的任務轉移到其他節點;同時,JobTracker 會跟蹤任務的執行進度、資源使用量等信息,並將這些信息告訴任務調度器,而調度器會在資源出現空閑時,選擇合適的任務使用這些資源。在Hadoop中,任務調度器是一個可插拔的模塊,用戶可以根據自己的需要設計相應的調度器。
3)TaskTracker
TaskTracker 會周期性地通過Heartbeat 將本節點上資源的使用情況和任務的運行進度匯報給JobTracker,同時接收JobTracker 發送過來的命令並執行相應的操作(如啟動新任務、殺死任務等)。TaskTracker 使用"slot"等量划分本節點上的資源量。"slot"代表計算資源(CPU、內存等)。一個Task 獲取到一個slot 后才有機會運行,而Hadoop 調度器的作用就是將各個TaskTracker 上的空閑slot分配給Task 使用。slot分為Map slot 和Reduce slot 兩種,分別供Map Task 和Reduce Task 使用。TaskTracker 通過slot 數目(可配置參數)限定Task 的並發度。
4)Task
Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動。HDFS 以固定大小的block 為基本單位存儲數據,而對於MapReduce 而言,其處理單位是split。split 是一個邏輯概念,它只包含一些元數據信息,比如數據起始位置、數據長度、數據所在節點等。它的划分方法完全由用戶自己決定。但需要注意的是,split 的多少決定了Map Task 的數目,因為每個split 只會交給一個Map Task 處理。Split 和 Block的關系如下圖所示

Map Task 執行過程如下圖 所示:由該圖可知,Map Task 先將對應的split 迭代解析成一個個key/value 對,依次調用用戶 自定義的map() 函數進行處理,最終將臨時結果存放到本地磁盤上, 其中臨時數據被分成若干個partition,每個partition 將被一個Reduce Task 處理

Reduce Task 執行過程下圖所示。該過程分為三個階段:
- 從遠程節點上讀取Map Task 中間結果(稱為"Shuffle 階段");
- 按照key 對key/value 對進行排序(稱為"Sort 階段");
- 依次讀取< key, value list>,調用用戶自定義的reduce() 函數處理,並將最終結果存到HDFS 上(稱為"Reduce 階段")

MapReduce 最大的特點之一就是有很好的容錯性,即使你的節點掛掉了1個、2個、3個,都是沒有問題的, 它都可以照常來運行,把你的作業或者應用程序運行完成。不會出現某個節點掛了,你的作業就運行失敗這種情況。 那么 MapReduce 到底是通過什么樣的機制,使它具有這么好的容錯性呢?下面我們依次來介紹一下。
1、JobTracker
很不幸,JobTracker 存在單點故障,一旦出現故障,整個集群就不可用。這個是1.0里面出現的問題,在2.0里面這個問題已經得到了解決。不過大家放心,即使在1.0中,MapReduce也不會經常出現故障。它可能一年也就是出現幾次故障,出現故障之后,你重啟一下,再把作業重新提交就可以了,它不會像 HDFS 那樣出現數據的丟失。 因為 MapReduce 是一個計算框架,計算過程是可以重現的,即使某個服務掛掉了,你重啟一下服務,然后把作業重新提交,也是不會影響你的業務的。
2、TaskTracker
TaskTracker 周期性的向 JobTracker 匯報心跳,如果一定的時間內沒有匯報這個心跳,JobTracker 就認為該TaskTracker 掛掉了,它就會把上面所有任務調度到其它TaskTracker(節點)上運行。這樣即使某個節點掛了,也不會影響整個集群的運行。
3、MapTask和ReduceTask
MapTask和ReduceTask 也可能運行掛掉。比如內存超出了或者磁盤掛掉了,這個任務也就掛掉了。這個時候 TaskTracker 就會把每個MapTask和ReduceTask的運行狀態回報給 JobTracker,JobTracker 一旦發現某個Task掛掉了,它就會通過調度器把該Task調度到其它節點上。這樣的話,即使任務掛掉了,也不會影響應用程序的運行
MapReduce 計算框架並沒有直接調用 CPU和內存等多維度資源,它把多維度資源抽象為 "slot",用 "slot" 來描述資源的數量。管理員可以在每個節點上單獨配置slot個數。slot可以分為map slot和reduce slot。從一定程度上,slot可以看做"任務運行並行度"。如果某個節點配置了5個map slot,那么這個節點最多運行5個Map Task;如果某個節點配置了3個reduce slot,那么該節點最多運行3個Reduce Task。下面我們分別介紹 Map slot和Reduce slot。
1、Map slot
- Map slot可用於運行 Map Task的資源,而且只能運行Map Task。
- 每個Map Task通常使用一個map slot。而比如像容量調度器,它可以有比較大的 MapTask。這樣的MapTask使用內存比較多,那么它可能使用多個map slot。
2、Reduce slot
- Reduce slot 可用於運行ReduceTask,而且只能運行ReduceTask。
- 每個ReduceTask通常使用一個reduce slot。而比如像容量調度器,它可以有比較大的 ReduceTask。這樣的ReduceTask使用內存比較多,那么它可能使用多個reduce slot
如果,您認為閱讀這篇博客讓您有些收獲,不妨點擊一下右下角的【推薦】。
如果,您希望更容易地發現我的新博客,不妨點擊一下左下角的【關注我】。
如果,您對我的博客所講述的內容有興趣,請繼續關注我的后續博客,我是【劉超★ljc】。
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。
