MapRduce保證reducer的輸入是按照key進行排過序的,原因和歸並排序有關,在reducer接收到不同的mapper輸出的有序數據后,需要再次進行排序,然后是分組排序,如果mapper輸出的是有序數據,將減少reducer階段排序的時間消耗.一般將排序以及Map的輸出傳輸到Reduce的過程稱為混洗(shuffle).Shuffle是MapReduce過程的核心,了解Shuffle非常有助於理解MapReduce的工作原理。如果你不知道MapReduce里的Shuffle是什么,那么請看下面這張圖
上圖中明顯分為兩個大部分Map任務和Reduce任務,圖中的紅色虛線代表數據流的一個過程,下面分兩部分進行說明:
MAP部分
每一個mapper都有一個circular buffer(環形緩存),環形緩沖區是一個先進先出的循環緩沖區,不用頻繁的分配內存,而且在大多數情況下,內存的反復使用也使得我們能用更少的內存塊做更多的事,默認情況下大小為100M(可以通過mapreduce.task.io.sort.mb來進行修改).Mapper的輸出會首先寫進這個緩存里面,當里面的內容達到一個閾值(mapreduce.map.sort.spill.percent,默認情況下為80%),一個后台線程就會開始向磁盤spill這些內容,同時Map將繼續向該緩存區寫內容.當緩存區寫滿時,Map被阻塞,直到spill過程完成才會被喚醒.Spills 將會循環寫進 mapreduce.cluster.local.dir定義的目錄下面,也就是說會產生多個spill磁盤文件.
在spill過程寫進磁盤之前還會做一些事情,步驟如下:
(1) 首先線程會先把寫的內容分成多個分組,這個和reducer的分組是一致的,partitioner的算法請參考我的另外一篇文章:hadoop之定制自己的Partitioner
(2) 針對每一個分組,線程會實現內存的排序,排序的過程請參考另外一篇文章: hadoop之定制自己的sort過程
(3) 如果存在combiner的話,combiner會在sort之后,在每一個分組進行執行,combiner的執行會導致寫到磁盤的數據減少.
每一次環形緩存達到閾值,就會產生一個spill的文件,也就是說可能會產生很多個spill文件.在任務結束之前,這些文件會被合並為統一的帶有分組和排好序的文件作為輸出.其中mapreduce.task.io.sort.factor定義了一次合並的文件的最大個數,默認的個數為10.另外如果文件個數大於3的話,combiner會再次被調用.如果僅有2個或者更少的文件,沒有必要調用combiner了.
如果mapper輸出的文件相對較大,不利於在網絡中傳輸,可以考慮下壓縮,既能減少寫入磁盤的時間開銷,也能減小傳輸的壓力.將mapreduce.map.output.compress設置為true即可,使用的壓縮算法的庫為mapreduce.map.output.compress.codec.是否使用壓縮要看減小的網絡傳輸和解壓縮時間的對比,如果提升不大,則沒有壓縮的必要.
Reduce部分
一個reducer的partition輸入,可能來自集群的很多個mapper的輸出,每個mapper的數據到達時間是不定的,reduce任務一旦接收到數據,立刻開始拷貝,而且這些拷貝的操作是由不同的線程並行運行的,這樣就可以接收來自不同的mapper的輸出數據.通過設置mapreduce.reduce.shuffle.parallelcopies,可以實現線程數量的改變,默認的情況下該值為5.
如果map的輸出文件很小,那么它們就會被拷貝到reduce任務的JVM內存中,否則會寫入到磁盤.當在JVM內存中的數據,達到一個閾值時(由mapreduce.reduce.shuffle.merge.percent屬性控制)或者map的輸出達到一個閾值時(由mapreduce.reduce.merge.inmem.threshold屬性控制),這些map輸出數據開始merge,並spill到磁盤中,如果mapper輸出文件存在壓縮,則會在內存中被解壓縮.如果merge過程中有combiner,則會被再次運行,以此減少寫入磁盤的數據.當磁盤上的文件逐漸增多時,后台程序會將多個spill文件sort和merge成更大的文件.
當所有的map輸出文件都已經被拷貝完成,reducer進入到sort階段,也就是混合map輸出文件,使數據保持有序的狀態.混合的過程采用round的方式,例如如果有50個map輸出文件,而混合因子是10( mapreduce.task.io.sort.facto),將會有5rounds去混合所有的文件,如下圖所示:
值得注意的是,最后的一次round可以混合內存和磁盤的數據段.