MapReduce 過程詳解


Hadoop 越來越火, 圍繞Hadoop的子項目更是增長迅速, 光Apache官網上列出來的就十幾個, 但是萬變不離其宗, 大部分項目都是基於Hadoop common

MapReduce 更是核心中的核心。那么到底什么是MapReduce, 它具體是怎么工作的呢?

關於它的原理, 說簡單也簡單, 隨便畫個圖噴一下Map 和 Reduce兩個階段似乎就完了。 但其實這里面還包含了Sort, Partition, Shuffle, Combine, Merge等子階段,尤其是Shuffle, 很多資料里都把它稱為MapReduce的“心臟”, 和所謂“奇跡發生的地方”。真正能說清楚其中關系的人就沒那么多了。可是了解這些流程對我們理解和掌握MapReduce 並對其進行調優是非常有用的。 

本文名為詳解, 其實筆者水平有限, 也就是結合自己的一些理解爭取能夠深入淺出地描述一下整個過程, 如有錯誤, 敬請指出。

 

首先我們看一副圖, 包含了從頭到尾的整個過程, 后面對所有步驟的解釋都以此圖作為參考 (此圖100%原創)

 

這張圖簡單來說, 就是說在我們常見的Map 和 Reduce 之間還有一系列的過程, 其中包括Partition, Sort, Combine, Copy, Merge等. 而這些過程往往被統稱為"Shuffle" 也就是 “混洗”. 而Shuffle 的目的就是對數據進行梳理,排序,以更科學的方式分發給每個Reducer,以便能夠更高效地進行計算和處理。 (難怪人家說這是奇跡發生的地方, 原來這里面有這么多花花, 能沒奇跡么?)

如果您是Hadoop的大牛, 看了這幅圖可能馬上要跳出來了, 不對! 還有一個spill 過程雲雲...

且慢, 關於spill, 我認為只是一個實現細節, 其實就是MapReduce利用內存緩沖的方式提高效率, 整個的過程和原理並沒有受影響, 所以在此處忽略掉spill 過程, 以便更好理解。

 

光看原理圖還是有點費解是吧? 沒錯! 雷子一直認為, 沒有例子的文章就是耍流氓 :) 所以我們就用大家都耳熟能詳的WordCount 作為例子, 開始我們的討論。

先創建兩個文本文件, 作為我們例子的輸入:

File 1 內容:
My name is Tony
My company is pivotal

File 2 內容:
My name is Lisa
My company is EMC

 

1. 第一步, Map

顧名思義, Map 就是拆解.

首先我們的輸入就是兩個文件, 默認情況下就是兩個split, 對應前面圖中的split 0, split 1

兩個split 默認會分給兩個Mapper來處理, WordCount例子相當地暴力, 這一步里面就是直接把文件內容分解為單詞和 1 (注意, 不是具體數量, 就是數字1)其中的單詞就是我們的主健,也稱為Key, 后面的數字就是對應的值,也稱為value.

那么對應兩個Mapper的輸出就是:

split 0

My       1
name    1
is         1
Tony     1
My          1
company     1
is       1
Pivotal   1

split 1

My       1
name    1
is       1
Lisa     1
My       1
company  1
is       1
EMC     1

2. Partition

Partition 是什么? Partition 就是分區。

為什么要分區? 因為有時候會有多個Reducer, Partition就是提前對輸入進行處理, 根據將來的Reducer進行分區. 到時候Reducer處理的時候, 只需要處理分給自己的數據就可以了。 

如何分區? 主要的分區方法就是按照Key 的不同,把數據分開,其中很重要的一點就是要保證Key的唯一性, 因為將來做Reduce的時候有可能是在不同的節點上做的, 如果一個Key同時存在於兩個節點上, Reduce的結果就會出問題, 所以很常見的Partition方法就是哈希。

結合我們的例子, 我們這里假設有兩個Reducer, 前面兩個split 做完Partition的結果就會如下:

split 0

Partition 1:
company 1
is  1
is    1
Partition 2:
My   1
My    1
name  1
Pivotal 1
Tony   1

split 1

Partition 1:
company 1
is    1 is 1
EMC   1
Partition 2:
My   1
My 1
name   1
Lisa
1

其中Partition 1 將來是准備給Reducer 1 處理的, Partition 2 是給Reducer 2 的

這里我們可以看到, Partition 只是把所有的條目按照Key 分了一下區, 沒有其他任何處理, 每個區里面的Key 都不會出現在另外一個區里面。

 

3. Sort

Sort 就是排序嘍, 其實這個過程在我來看並不是必須的, 完全可以交給客戶自己的程序來處理。 那為什么還要排序呢? 可能是寫MapReduce的大牛們想,“大部分reduce 程序應該都希望輸入的是已經按Key排序好的數據, 如果是這樣, 那我們就干脆順手幫你做掉啦, 請叫我雷鋒!”  ......好吧, 你是雷鋒.

那么我們假設對前面的數據再進行排序, 結果如下:

split 0

Partition 1:
company 1
is  1
is    1
Partition 2:
My   1
My    1
name  1
Pivotal 1
Tony   1

 split 1

Partition 1:
company 1
EMC   1
is    1 is 1

Partition 2:
Lisa   1
My   1
My 1
name   1

 這里可以看到, 每個partition里面的條目都按照Key的順序做了排序

4. Combine

什么是Combine呢? Combine 其實可以理解為一個mini Reduce 過程, 它發生在前面Map的輸出結果之后, 目的就是在結果送到Reducer之前先對其進行一次計算, 以減少文件的大小, 方便后面的傳輸。 但這步也不是必須的。

按照前面的輸出, 執行Combine:

split 0

Partition 1:
company 1
is  2

Partition 2:
My   2
name  1
Pivotal 1
Tony   1

 split 1

Partition 1:
company 1
EMC   1
is    2

Partition 2:
Lisa   1
My   2
name   1

 我們可以看到, 針對前面的輸出結果, 我們已經局部地統計了is 和My的出現頻率, 減少了輸出文件的大小。

5. Copy

下面就要准備把輸出結果傳送給Reducer了。 這個階段被稱為Copy, 但事實上雷子認為叫他Download更為合適, 因為實現的時候, 是通過http的方式, 由Reducer節點向各個mapper節點下載屬於自己分區的數據。

那么根據前面的Partition, 下載完的結果如下:

Reducer 節點 1 共包含兩個文件:

Partition 1:
company 1
is  2

 

Partition 1:
company  1
EMC    1
is    2

 

Reducer 節點 2 也是兩個文件:

  Partition 2:
My     2
name  1
Pivotal 1
Tony   1

 

Partition 2:
Lisa   1
My     2
name   1

這里可以看到, 通過Copy, 相同Partition 的數據落到了同一個節點上。

6. Merge

如上一步所示, 此時Reducer得到的文件是從不同Mapper那里下載到的, 需要對他們進行合並為一個文件, 所以下面這一步就是Merge, 結果如下:

Reducer 節點 1

company 1
company 1
EMC   1 is  
2
is    2

 

Reducer 節點 2

Lisa  1
My   2
My    2 name  1
name  1 Pivotal 1 Tony   1

7. Reduce

終於可以進行最后的Reduce 啦...這步相當簡單嘍, 根據每個文件中的內容最后做一次統計, 結果如下:

Reducer 節點 1

company 2
EMC    1 is  4

Reducer 節點 2

Lisa  1
My   4 name  2 Pivotal 1 Tony   1

至此大功告成! 我們成功統計出兩個文件里面每個單詞的數目, 同時把它們存入到兩個輸出文件中, 這兩個輸出文件也就是傳說中的 part-r-00000 和 part-r-00001, 看看兩個文件的內容, 再回頭想想最開始的Partition, 應該是清楚了其中的奧秘吧。

如果你在你自己的環境中運行的WordCount只有part-r-00000一個文件的話, 那應該是因為你使用的是默認設置, 默認一個job只有一個reducer

如果你想設兩個, 你可以:

1. 在源代碼中加入  job.setNumReduceTasks(2), 設置這個job的Reducer為兩個
或者
2. 在 mapred-site.xml 中設置下面參數並重啟服務
<property>
  <name>mapred.reduce.tasks</name>
  <value>2</value>
</property>

這樣, 整個集群都會默認使用兩個Reducer

 

結束語:

本文大致描述了一下MapReduce的整個過程以及每個階段所作的事情, 並沒有涉及具體的job,resource的管理和控制, 因為那個是第一代MapReduce框架和Yarn框架的主要區別。 而兩代框架中上述MapReduce 的原理是差不多的,希望對大家有所幫助。 

 

版權聲明:

本文由 雷子-曉飛爸 所有,發布於http://www.cnblogs.com/npumenglei/ 如果轉載,請注明出處,在未經作者同意下將本文用於商業用途,將追究其法律責任。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM