通過前面的實例,可以基本了解MapReduce對於少量輸入數據是如何工作的,但是MapReduce主要用於面向大規模數據集的並行計算。所以,還需要重點了解MapReduce的並行編程模型和運行機制。
我們知道,MapReduce計算模型主要由三個階段構成:Map、shuffle、Reduce。Map和Reduce操作需要我們自己定義相應Map類和Reduce類。而shuffle則是系統自動幫我們實現的,是MapReduce的“心臟”,是奇跡發生的地方。是其主要流程基本如下圖所示:

1、數據的輸入
首先,對於MapReduce所要處理的數據,應當存儲在分布式文件系統(如HDFS)中,通過使用Hadoop資源管理系統YARN,將MapReduce計算轉移到存儲有部分數據的機器上。
對於輸入數據,首先要對其進行輸入分片,Hadoop為每個輸入分片構建一個map任務,在該任務中調用map函數對分片中的每條數據記錄進行處理。處理每個分片的時間小於處理整個數據所花的時間,因此,只要合理分片,整個處理過程就能獲得很好的負載均衡。
而關於合理分片,我們不難想到:如果分片數據太大,那么處理所花的時間比較長,整體性能提升不多;反之,如果分片數據切分的太小,那么管理分片的時間和構建map任務的時間又會加大。因此分片要合理,一般情況下,一個合理的分片趨向於一個HDFS塊的大小,默認為128M(這也跟map任務的數據本地化有關,當大於一個HDFS塊的大小時,就會導致網絡傳輸,降低性能)。
2、Map階段
Map任務在存儲有輸入數據的節點上運行,這樣可以獲得最佳性能,因為無需耗用寶貴的集群帶寬,這就是“數據本地化”的優勢(主要就是減少了網絡傳輸)。
每個Map任務處理一個輸入分片的數據。對於該Map任務所處理的那個分片的數據,通過調用map函數對分片中的每條數據記錄進行處理,而Map函數是由用戶實現的,因此這里的計算邏輯是用戶控制的,但是必須滿足輸入的是鍵值對,輸出的也是鍵值對,即完成以下過程:
<k1,v1> ——> list<k2,v2>
這里需要着重理解的是:map任務將其輸出寫入本地磁盤,而非HDFS。這是因為map的輸出只是一個中間結果,一旦整體作業完成就可以刪除,因此沒有必要在HDFS中進行備份存儲,如果在本地磁盤發生丟失,那么只需要在另一個節點上重新執行該map任務即可。
而將Map的輸出寫入到本地磁盤的整個過程是相當復雜的,這個過程就是在map端的shuffle過程,主要分為以下幾步:
(1)map的輸出寫入環形緩沖區
(2)溢出寫:主要包括:分區-->排序(快速排序)-->合並(combiner,如果有的話)--> 溢出寫到磁盤
(3)歸並:將多個溢出文件歸並為一個輸出文件
接下來主要介紹這個map端的shuffle過程。

Map函數產生輸出時,不是簡單的寫入到磁盤。首先,每個map任務在內存中都有一個環形緩沖區,一般默認大小為100M。Map開始產生輸出后,先將數據存入這個緩沖區,當緩沖區存儲內容達到閾值(比如80%)時,啟動一個后台線程將內容溢出(spill)到本地磁盤。在溢出寫的同時,map繼續輸出到緩沖區,如果此期間緩沖區填滿,則需要阻塞等待寫磁盤的過程完成。
而這個后台線程將內容溢出到磁盤的過程也不是直接的簡單寫,它首先根據這個數據最終要傳的reducer把數據分成相應的分區(patition),在每個分區中按照鍵在內存中對數據進行排序。如果指定了combiner,那么就在排序后得到的輸出上運行,使得map的輸出更加緊湊,從而減少寫到磁盤的數據和傳遞給reducer的數據。
最后,由於每次緩沖區內容達到閾值,都會產生一個溢出文件,最終在map任務結束時,可能會有多個溢出文件,在結束之前將這些溢出文件進行歸並,形成一個輸出文件。
理解這個過程后,我們就可以知道:map任務結束后最終寫到本地磁盤的是一個已經分區並且已經排序的輸出文件。順便一提,將map的輸出寫入到磁盤時進行壓縮是個好辦法,因為減小了寫到磁盤的數據和傳遞給reducer的數據量。
如下的圖可以更好的理解這一過程。這里注意合並和歸並的區別:
兩個鍵值對<“a”,1>和<“a”,1>,如果合並,會得到<“a”,2>,如果歸並,會得到<“a”,<1,1>>。

3、Reduce階段
Reduce任務並不具備數據本地化的優勢,單個reduce任務的輸入通常情況下是來自於所有Mapper的輸出,因此,排過序的map輸出需要通過網絡傳輸到運行reduce任務的節點,數據在這里進行歸並,然后由用戶定義的reduce函數進行處理,最終將得到的輸出結果存儲在HDFS中,從而完成整個MapReduce作業,即完成以下過程:
<k2,list< v2>> ——> <k3,v3>
這里需要注意的是:Reducer任務的數量並不是由輸入數據的大小決定的,而是可以獨立指定的,可以在程序中指定reduce任務的數量,可以有1個,可以有多個,甚至沒有reduce任務也是可以的。
如果有多個reduce任務,每個map任務就會將自己的輸出進行分區(partition),每個renduce任務對應一個分區,每個鍵對應的所有鍵值對記錄都在同一個分區中(因為同樣的key經過同樣的哈希函數可以得到相同的結果,就會進入同一個分區)。分區函數通常使用默認的哈希函數。
接下來我們還是重點關注reduce端的shuffle過程,主要分為以下三個過程:
(1)復制(Copy)
(2)歸並(Merge)
(3)reduce

(1)復制(Copy)
首先,在復制階段,由於map的輸出位於運行該任務的節點的本地磁盤上,而reduce任務需要集群上若干個map任務的輸出作為其分區文件,每個map任務完成時間可能不同,所以一旦有某個任務完成時,reduce任務就開始復制其輸出。reduce任務有少量的復制線程(默認是5個),因此可以並行地復制map的輸出。
這里有兩個問題可能會有疑問:
1、Reducer如何知道自己應該處理哪些數據呢?
因為Map端進行partition的時候,實際上就相當於指定了每個Reducer要處理的數據(partition就對應了Reducer),所以Reducer在拷貝數據的時候只需拷貝與自己對應的partition中的數據即可。每個Reducer會處理一個或者多個partition。
2、reducer如何知道要從哪台機器上去取map輸出呢?
map任務完成后,它們會使用心跳機制通知它們的application master、因此對於指定作業,application master知道map輸出和主機位置之間的映射關系。reducer中的一個線程定期詢問master以便獲取map輸出主機的位置。知道獲得所有輸出位置。
(2)歸並(Merge)
接下來,關於復制過來的數據如何保存也是比較復雜的。如果數據量比較小,則會復制到reduce任務JVM的內存緩沖區中,一旦緩沖區達到閾值,則歸並后溢出寫到磁盤中,如果指定Combiner,則在歸並期間運行它以降低寫入硬盤的數據量。隨着磁盤上數據的增多,會有一個后台線程將它們歸並為更大的、排好序的文件,為后面的歸並做准備。
實際上,這已經開始了Merge過程,也就是說復制和歸並兩個階段不是完全分開的,是重疊進行的,一邊copy一邊merge。
當所有map的輸出復制完畢后,會進行總的merge(也可以說是排序),這個階段將所有的map輸出進行合並,維持其順序排序。這個過程是循環進行的,比如有50個map的輸出,合並因子為10,那么合並將進行5趟,每趟將10個文件合成一個,最后有5個中間文件。不過作為一個優化,並沒有將這5個文件再歸並為一個。
所謂最后總的merge,得到的是一個整體並且有序的數據塊作為reduce的輸入。這里有一個優化措施:默認情況下,reduce從磁盤獲得所有的數據,可以通過參數來配置使得buffer中的一部分數據可以直接輸送到reduce,從而可以減少了一次磁盤的讀寫。
總結起來看,整個merge過程實際分為三種類型:內存到內存merge、內存到磁盤merge、磁盤到磁盤merge。如果復制過來的數據量比較小,在內存中可以存放,則會發生內存到內存的merge。當內存緩沖區的數據量達到閾值時,會啟動溢出寫的過程,這時將內存中的數據歸並后寫到磁盤,此時發生的就是內存到磁盤的merge。每次觸發溢出寫都會生成一個磁盤文件,隨着磁盤文件的增多,會將多個文件歸並為一個文件,這時發生的是磁盤到磁盤的merge。
因此,經過了復制和歸並兩個階段后,reduce段的shuffle過程就得到了一個整體按鍵有序的數據塊(即<k2,list< v2>>,注意它可以是來自內存和磁盤片段),這就是reduce函數的輸入。
(3)reduce
reduce階段是最后一個階段,merge得到的數據直接輸入reduce函數,輸入是所有的Key和它的Value迭代器,此階段的輸出直接寫到輸出文件系統,一般為HDFS。如果采用HDFS,由於NodeManager也運行數據節點,所以第一個塊副本將被寫到本地磁盤。
至此,整個MapReduce的執行過程就結束了,可以發現,整個shuffle過程確實是其核心所在,是工作得以進行的保證,是“奇跡”發生的地方。
【MapReduce整個過程有幾次排序?分別發生在什么地方?】
通過對以上編程模型的理解,我們可以總結得出實際上MapReduce有三次排序的過程,第一次發生在map端溢出寫之前,后台線程對緩沖區的鍵值對進行sort,這里用的是快速排序。第二次是發生在map端將磁盤上多個溢出文件歸並為一個輸出文件時,這是會將key值相同的歸並的一起,這用的是歸並排序。第三次不難理解,發生在reduce端的merge階段,對給reduce任務要處理的數據進行歸並,這同樣是歸並排序。因此,實際上,只有第一次可以說是真正的排序,而后兩次排序是由歸並帶來的,說其為歸並更加合適。
參考:
作者:ASN_forever
來源:CSDN
原文:https://blog.csdn.net/ASN_forever/article/details/81233547
版權聲明:本文為博主原創文章,轉載請附上博文鏈接!