Shuffle和排序


  MapReduce確保每個reducer的輸入都按鍵排序。系統執行排序的過程——將map輸出作為輸入傳給reducer——稱為shuffle。shuffle屬於不斷被優化和改進的代碼庫的一部分,從許多方面來看,shuffle是MapReduce的“心臟”,是奇跡發生的地方。事實上,shuffle這個說法並不准確。因為在某些語境中,它只代表reduce任務獲取map輸出的這部分過程。在這里,我們將其理解為從map產生輸出到reduce的消化輸入的整個過程。

map端:

  map函數開始產生輸出時,並不是簡單地將它寫到磁盤。這個過程更復雜,它利用緩沖的方式寫到內存緩沖區,並出於效率的考慮進行預排序(步驟1)。map的輸出結果是由collector處理的,所以map端的shuffle過程包含在collect函數對map輸出結果的處理過程中。

  每個map任務都有一個環形內存緩沖區,用於存儲任務的輸出,默認情況下,緩沖區的大小為100MB,此值可以通過改變io.sort.mb屬性來調整。一旦緩沖內容達到閥值(io.sort.spill.percent,默認為0.80,或80%),一個后台線程便開始把內容溢寫(spill)到磁盤中。在寫磁盤過程中,map輸出繼續被寫到緩沖區,但如果在此期間緩沖區被填滿,map會阻塞直到寫磁盤過程完成。

final int kvnext = (kvindex + 1) % kvoffsets.length;
do{
        //在環形緩沖區中,如果下一個空閑位置同起始位置相等,那么緩沖區已滿
        kvfull = kvnext ==kvstart;
        //環形緩沖區的內容是否達到寫出的閥值
        final Boolean kvsoftlimit = ((kvnext > kvend ) ? kvnext – kvend > softRecordLimit : kvend – kvnext <= kvoffsets.length – softRecordLimit );
        //達到閥值,寫出緩沖區內容,形成spill文件
        if(kvstart == kvend && kvsoftlimit ){
            startspill();
        }
        //如果緩沖區滿,則map任務等待寫出過程結束
        if( kvfull ){
              while ( kvstart != kvend ){
                      reporter.progress();
                      spillDone.await();
              }
       }
}

  寫磁盤將按輪詢方式寫到mapred.local.dir屬性指定的作業特定子目錄中的目錄中。

  在collect函數中將緩沖區中的內容寫出前會調用sortAndSpill函數。sortAndSpill函數每被調用一次就會創建一個spill文件(步驟2),然后按照key值對需要寫出的數據進行排序(步驟3),如果有多個reduce任務,則每個map任務都會對其輸出進行分區(partition),即為每個reduce任務建立一個分區。每個分區有許多鍵(及其對應值),但每個鍵對應的鍵/值對記錄都在同一分區中。分區由用戶定義的分區函數控制,但通常用默認的分區器(partitioner,有時也被叫做分區函數)通過哈希函數來分區,按照划分的順序將所有需要寫出的結果溢寫到這個spill中(步驟4或步驟5)。

  如果用戶作業配置了combiner類,那么在寫出過程中會先調用combineAndSpill()再寫出,對結果進行進一步的合並(combine)是為了讓map的輸出數據更加緊湊(步驟4)。

  但是並不是所有的項目都可以添加combiner函數。例如,計算氣溫的最大值,max(0,20,10,25,15) = max(max(0,20,10),max(25,15)) = max(20,25) = 25,這是沒有問題的,但是計算氣溫的平均值就不行了。mean(0,20,10,25,15) = 14,

  而combiner不能取代reduce函數:

  mean(mean(0,20,10),mean(25,15)) = mean(10,20) = 15

  為什么呢?我們仍然需要reduce函數來處理不同map輸出中具有相同鍵的記錄。但是combiner能有效減少map和reduce之間的數據傳輸量,在MapReduce作業中使用combiner是需要慎重考慮的。

  sortAndSpill函數的執行過程可以參考下面sortAndSpill函數的代碼。

//創建spill文件
Path filename = mapOutputFile.getSpillFileForWrite(getTaskID() , numSpills , size );
out = rfs.create(filename);
…….
//按照key值對待寫出的數據進行排序
sorter.sort( MapOutputBuffer.this , kvstart , endPosition , reporter );
……..
//按照划分將數據寫入文件
for ( int i = 0 ; i < partitions ; ++ i ){
        IFile.Writer< K , V >  writer = null ;
        long segmentStart = out.getPos();
        writer = new Writer< K , V >(job , out , keyClass , valClass , codec );
 
//如果沒有沒有配置combiner類,數據直接寫入文件
if( null == combinerClass ){
        …….
}
else{
        ……..
//如果配置了combiner類,先調用combineAdnSpill函數后再寫入文件
combineAndSpill( kvIter , combineInputCounter );
}  }

  這里需要注意的是,combine函數要做的一般就是reducer要做的事,先處理部分數據,再在reducer中集中處理所有的數據,這樣傳輸給reducer的數據會減少,reducer要做的工作量也會減少。其實這里面還有一個集聚的過程,這個過程不是combine,是系統默認進行的,它會將map的輸出中相同的key的value聚集成value-list(這里的聚集通過實驗發現,map輸出到內存緩沖區,經過sort,partition過程形成排好序的列表,但是key和value還是一樣的,只是順序改變了而已,即使有了combine,combine也是執行reduce函數的動作(這里要看combine的class設置成什么,如果是job.setCombinerClass(Reduce.class),combine則執行reduce函數的動作,而數據在輸入給combine之前沒有進行聚集,所以我認為聚集的過程是在溢寫到磁盤文件中進行的,或者在磁盤中的多個splil文件進行merge合並的時候進行的。)。

  顯然,直接將每個map生成的眾多spill文件(因為map過程中,每一次緩沖區寫出都會產生一個spill文件)交給reduce處理不現實。所以在每個map任務結束之后在map的TaskTracker上還會執行合並操作(merge)(步驟6),這個操作的主要目的是將map生成的眾多spill文件中的數據按照划分重新組織,以便於reduce處理。主要做法是針對指定的分區,從各個spill文件中拿出屬於同一個分區的所有數據,然后將它們合並在一起,並寫入一個已分區且已排序的map輸出文件中。最后每個map只生成一個輸出文件。

  待唯一的已分區且已排序的map輸出文件寫入最后一條記錄后,map端的shuffle階段就結束了。下面就進入reduce端的shuffle階段。

reduce端:

  在reduce端,shuffle階段可以分成三個階段:復制map輸出、排序合並、reduce處理。

  map輸出文件位於運行map任務的TaskTracker的本地磁盤(注意,盡管map輸出經常寫到map TaskTracker的本地磁盤,但reduce輸出並不這樣),現在,TaskTtracker需要為分區文件運行reduce任務。更進一步,reduce任務需要集群上若干個map任務的map輸出作為其特殊的分區文件。每個map任務的完成時間可能不同,因此只要有一個任務完成,reduce任務就開始復制其輸出。也就是reduce任務的復制階段(步驟7)。reduce任務有少量復制線程,因此能夠並行取得map輸出,默認值是5個線程。

  reducer如何知道要從哪個TaskTracker取得map輸出呢?map任務成功完成后,它們會通知其父TaskTracker狀態已更新,然后TaskTracker進而通知JobTracker。這些通知在心跳通信機制中傳輸。因此,對於指定作業,JobTracker知道map輸出和TaskTracker之間的映射關系。reducer中的一個線程定期詢問JobTracker以便獲取map輸出的位置,直到它獲得所有輸出位置。

  由於reducer可能失敗,因此TaskTracker並沒有在第一個reducer檢索到map輸出時就立即從磁盤上刪除它們。相反,TaskTracker會等待,直到JobTracker告知它們可以刪除map輸出,這是作業完成后執行的。

  如果map輸出相當小,則會被復制到執行reduce任務的TaskTracker節點的內存中,以便進一步的處理,否則輸出被復制到磁盤中。

  一旦內存緩沖區達到閥值大小或達到map輸出閥值,則合並后溢出寫到磁盤中。隨着磁盤上副本的增多,后台線程會將這些從各個map TaskTracker上復制的map輸出文件(無論在內存還是在磁盤上)進行整合,合並為更大的、排好序的文件,並維持數據原來的順序(步驟8)。這會為后面的合並節省一些時間。注意,為了合並,壓縮的map輸出(通過map任務)都必須在內存中被解壓縮。

  reduce端的最后階段就是對合並的文件進行reduce處理。reduce TaskTracker從合並的文件中按照順序先拿出一條數據,交給reduce函數處理,然后直接將結果輸出到本地的HDFS上(因為在Hadoop集群上,TaskTracker節點一般也是DataNode節點),接着繼續拿出下一條數據,再進行處理。下面是reduce Task上run函數的部分代碼,從這個函數可以看出整個reduce端的三個步驟。

//復制階段,從map TaskTracker出獲取map輸出
boolean isLocal = “local”.equals(job.get(“mapred.job.tracker”,”local”));
if( !isLocal ){
         reduceCopier = new ReduceCopier(umbilical , job );
         if ( ! reduceCopier.fetchOutpus() ){
           ……….
         }
}
//復制階段結束
copyPhase.complete();
//合並階段,將得到的map輸出合並
setPhase(TaskStatus.Phase.SORT);
……….
//合並階段結束
sortPhase.complete();
//reduce階段
setPhase(TaskStatus.Phase.REDUCE);
… ….
Reducer reducer = ReflectionUtils.newInstance(job.getReducerClass() , job);
… …
//逐個讀出每一條記錄,然后調用Reducer的reduce函數
while ( values.more() ){
         reduceInputKeyCounter.increment(1);
         reducer.reduce(values,getKey() , values , collector , reporter);
         values.nextKey();
          values.informReduceProgress();
         }
  values.informReduceProgress();
}
reducer.close();
out.close(reporter);
done(umbilical);
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM