
MapReduce確保每個reducer的輸入都按鍵排序。系統執行排序的過程——將map輸出作為輸入傳給reducer——稱為shuffle。shuffle屬於不斷被優化和改進的代碼庫的一部分,從許多方面來看,shuffle是MapReduce的“心臟”,是奇跡發生的地方。事實上,shuffle這個說法並不准確。因為在某些語境中,它只代表reduce任務獲取map輸出的這部分過程。在這里,我們將其理解為從map產生輸出到reduce的消化輸入的整個過程。
map端:
map函數開始產生輸出時,並不是簡單地將它寫到磁盤。這個過程更復雜,它利用緩沖的方式寫到內存緩沖區,並出於效率的考慮進行預排序(步驟1)。map的輸出結果是由collector處理的,所以map端的shuffle過程包含在collect函數對map輸出結果的處理過程中。
每個map任務都有一個環形內存緩沖區,用於存儲任務的輸出,默認情況下,緩沖區的大小為100MB,此值可以通過改變io.sort.mb屬性來調整。一旦緩沖內容達到閥值(io.sort.spill.percent,默認為0.80,或80%),一個后台線程便開始把內容溢寫(spill)到磁盤中。在寫磁盤過程中,map輸出繼續被寫到緩沖區,但如果在此期間緩沖區被填滿,map會阻塞直到寫磁盤過程完成。
final int kvnext = (kvindex + 1) % kvoffsets.length;
do{ //在環形緩沖區中,如果下一個空閑位置同起始位置相等,那么緩沖區已滿 kvfull = kvnext ==kvstart; //環形緩沖區的內容是否達到寫出的閥值 final Boolean kvsoftlimit = ((kvnext > kvend ) ? kvnext – kvend > softRecordLimit : kvend – kvnext <= kvoffsets.length – softRecordLimit ); //達到閥值,寫出緩沖區內容,形成spill文件 if(kvstart == kvend && kvsoftlimit ){ startspill(); } //如果緩沖區滿,則map任務等待寫出過程結束 if( kvfull ){ while ( kvstart != kvend ){ reporter.progress(); spillDone.await(); } } }
寫磁盤將按輪詢方式寫到mapred.local.dir屬性指定的作業特定子目錄中的目錄中。
在collect函數中將緩沖區中的內容寫出前會調用sortAndSpill函數。sortAndSpill函數每被調用一次就會創建一個spill文件(步驟2),然后按照key值對需要寫出的數據進行排序(步驟3),如果有多個reduce任務,則每個map任務都會對其輸出進行分區(partition),即為每個reduce任務建立一個分區。每個分區有許多鍵(及其對應值),但每個鍵對應的鍵/值對記錄都在同一分區中。分區由用戶定義的分區函數控制,但通常用默認的分區器(partitioner,有時也被叫做分區函數)通過哈希函數來分區,按照划分的順序將所有需要寫出的結果溢寫到這個spill中(步驟4或步驟5)。
如果用戶作業配置了combiner類,那么在寫出過程中會先調用combineAndSpill()再寫出,對結果進行進一步的合並(combine)是為了讓map的輸出數據更加緊湊(步驟4)。
但是並不是所有的項目都可以添加combiner函數。例如,計算氣溫的最大值,max(0,20,10,25,15) = max(max(0,20,10),max(25,15)) = max(20,25) = 25,這是沒有問題的,但是計算氣溫的平均值就不行了。mean(0,20,10,25,15) = 14,
而combiner不能取代reduce函數:
mean(mean(0,20,10),mean(25,15)) = mean(10,20) = 15
為什么呢?我們仍然需要reduce函數來處理不同map輸出中具有相同鍵的記錄。但是combiner能有效減少map和reduce之間的數據傳輸量,在MapReduce作業中使用combiner是需要慎重考慮的。
sortAndSpill函數的執行過程可以參考下面sortAndSpill函數的代碼。
//創建spill文件 Path filename = mapOutputFile.getSpillFileForWrite(getTaskID() , numSpills , size ); out = rfs.create(filename); ……. //按照key值對待寫出的數據進行排序 sorter.sort( MapOutputBuffer.this , kvstart , endPosition , reporter ); …….. //按照划分將數據寫入文件 for ( int i = 0 ; i < partitions ; ++ i ){ IFile.Writer< K , V > writer = null ; long segmentStart = out.getPos(); writer = new Writer< K , V >(job , out , keyClass , valClass , codec ); //如果沒有沒有配置combiner類,數據直接寫入文件 if( null == combinerClass ){ ……. } else{ …….. //如果配置了combiner類,先調用combineAdnSpill函數后再寫入文件 combineAndSpill( kvIter , combineInputCounter ); } }
這里需要注意的是,combine函數要做的一般就是reducer要做的事,先處理部分數據,再在reducer中集中處理所有的數據,這樣傳輸給reducer的數據會減少,reducer要做的工作量也會減少。其實這里面還有一個集聚的過程,這個過程不是combine,是系統默認進行的,它會將map的輸出中相同的key的value聚集成value-list(這里的聚集通過實驗發現,map輸出到內存緩沖區,經過sort,partition過程形成排好序的列表,但是key和value還是一樣的,只是順序改變了而已,即使有了combine,combine也是執行reduce函數的動作(這里要看combine的class設置成什么,如果是job.setCombinerClass(Reduce.class),combine則執行reduce函數的動作,而數據在輸入給combine之前沒有進行聚集,所以我認為聚集的過程是在溢寫到磁盤文件中進行的,或者在磁盤中的多個splil文件進行merge合並的時候進行的。)。
顯然,直接將每個map生成的眾多spill文件(因為map過程中,每一次緩沖區寫出都會產生一個spill文件)交給reduce處理不現實。所以在每個map任務結束之后在map的TaskTracker上還會執行合並操作(merge)(步驟6),這個操作的主要目的是將map生成的眾多spill文件中的數據按照划分重新組織,以便於reduce處理。主要做法是針對指定的分區,從各個spill文件中拿出屬於同一個分區的所有數據,然后將它們合並在一起,並寫入一個已分區且已排序的map輸出文件中。最后每個map只生成一個輸出文件。
待唯一的已分區且已排序的map輸出文件寫入最后一條記錄后,map端的shuffle階段就結束了。下面就進入reduce端的shuffle階段。
reduce端:
在reduce端,shuffle階段可以分成三個階段:復制map輸出、排序合並、reduce處理。
map輸出文件位於運行map任務的TaskTracker的本地磁盤(注意,盡管map輸出經常寫到map TaskTracker的本地磁盤,但reduce輸出並不這樣),現在,TaskTtracker需要為分區文件運行reduce任務。更進一步,reduce任務需要集群上若干個map任務的map輸出作為其特殊的分區文件。每個map任務的完成時間可能不同,因此只要有一個任務完成,reduce任務就開始復制其輸出。也就是reduce任務的復制階段(步驟7)。reduce任務有少量復制線程,因此能夠並行取得map輸出,默認值是5個線程。
reducer如何知道要從哪個TaskTracker取得map輸出呢?map任務成功完成后,它們會通知其父TaskTracker狀態已更新,然后TaskTracker進而通知JobTracker。這些通知在心跳通信機制中傳輸。因此,對於指定作業,JobTracker知道map輸出和TaskTracker之間的映射關系。reducer中的一個線程定期詢問JobTracker以便獲取map輸出的位置,直到它獲得所有輸出位置。
由於reducer可能失敗,因此TaskTracker並沒有在第一個reducer檢索到map輸出時就立即從磁盤上刪除它們。相反,TaskTracker會等待,直到JobTracker告知它們可以刪除map輸出,這是作業完成后執行的。
如果map輸出相當小,則會被復制到執行reduce任務的TaskTracker節點的內存中,以便進一步的處理,否則輸出被復制到磁盤中。
一旦內存緩沖區達到閥值大小或達到map輸出閥值,則合並后溢出寫到磁盤中。隨着磁盤上副本的增多,后台線程會將這些從各個map TaskTracker上復制的map輸出文件(無論在內存還是在磁盤上)進行整合,合並為更大的、排好序的文件,並維持數據原來的順序(步驟8)。這會為后面的合並節省一些時間。注意,為了合並,壓縮的map輸出(通過map任務)都必須在內存中被解壓縮。
reduce端的最后階段就是對合並的文件進行reduce處理。reduce TaskTracker從合並的文件中按照順序先拿出一條數據,交給reduce函數處理,然后直接將結果輸出到本地的HDFS上(因為在Hadoop集群上,TaskTracker節點一般也是DataNode節點),接着繼續拿出下一條數據,再進行處理。下面是reduce Task上run函數的部分代碼,從這個函數可以看出整個reduce端的三個步驟。
//復制階段,從map TaskTracker出獲取map輸出 boolean isLocal = “local”.equals(job.get(“mapred.job.tracker”,”local”)); if( !isLocal ){ reduceCopier = new ReduceCopier(umbilical , job ); if ( ! reduceCopier.fetchOutpus() ){ ………. } } //復制階段結束 copyPhase.complete(); //合並階段,將得到的map輸出合並 setPhase(TaskStatus.Phase.SORT); ………. //合並階段結束 sortPhase.complete(); //reduce階段 setPhase(TaskStatus.Phase.REDUCE); … …. Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass() , job); … … //逐個讀出每一條記錄,然后調用Reducer的reduce函數 while ( values.more() ){ reduceInputKeyCounter.increment(1); reducer.reduce(values,getKey() , values , collector , reporter); values.nextKey(); values.informReduceProgress(); } values.informReduceProgress(); } reducer.close(); out.close(reporter); done(umbilical); }
