組成部分
Shuffle階段分為兩部分:Map端和Reduce端。
Sort階段就是對Map端輸出的key進行排序。
第一部分:Map端Shuffle
對於輸入文件,會進行分片,對於一個split,有一個map任務進行處理,每個Map在內存中都有一個緩存區,map的輸出結果會先放到這個緩沖區中,在緩沖區中,會進行預排序(即sort和comibner),以提高效率。
緩沖區默認大小是100MB(可以通過io.sort.mb屬性更改大小),當緩沖區中的數據達到特定的閾值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent默認是0.80)時,系統會啟動一個后台線程把緩沖區的內容spill(溢寫)到磁盤。溢出到磁盤的一個臨時文件中,即80%的內容成為一個臨時文件。當這80%的內容溢出時,map會繼續向剩余的20%緩沖區中輸出。
spill線程在把緩沖區中的數據寫到磁盤前,會進行一個二次快速排序,首先根據數據所屬的Partition排序,然后每個Partition中再按Key排序。輸出包括一個索引文件和數據文件。如果設定了Combiner,將在排序輸出的基礎上進行。
Comibner就是一個Mini Reducer,在執行Map任務的節點本身運行,對Map的輸出做一次簡單Reduce,使得Map'de輸出更緊湊,更少的數據會被寫入磁盤和傳送到Reduce端。
一個Map任務會產生多個spill文件,在Map任務完成前,所有的spill文件將會歸並排序為一個索引文件和數據文件。當spill文件歸並完成后,Map將刪除所有的臨時文件,並告知TaskTracker任務已完成。
對寫入到磁盤的數據可以選擇采取壓縮的方式,如果需要壓縮,則需要設置mapred.compress.map.output為true。
還有一個Partition的概念,一個臨時文件是進行了分區的,並且分區的數量由reduce的數量決定,不同的分區傳給不同的reduce。
第二部分:Reduce端Shuffle
Reduce端通過HTTP獲取Map端的數據,只要有一個map任務完成,Reduce任務就開始復制它的輸出,這稱為copy階段。
JobTracker知道Map輸出與TaskTracker的映射關系,Reduce端有一個線程間歇地向JobTracker詢問Map輸出的地址,直到把所有的數據都獲取到。
如果map輸出比較小,他們被復制到Reduce的內存中,如果緩沖區空間不足,會被復制到磁盤上。復制的數據放在磁盤上,后台線程會進行歸並為更大的排序文件,對於壓縮文件,系統會自動解壓到內存方便歸並。
當所有的Map輸出被復制后,Reduce任務進入排序階段(確切的說是歸並階段),這個過程會重復多次。Merge有三種形式:內存到內存,內存到磁盤,磁盤到磁盤。
內存到內存默認不啟用;內存到磁盤的方式也會產生溢寫,如果設置了Combiner,此時也會啟用,在磁盤上生成多個溢寫文件;磁盤到磁盤會生成一個最終的文件作為Reduce的輸入。