Hadoop深入學習:MapTask詳解


         在本節中,我們主要來學習MapTask的內部實現。 
         
         整體執行流程 
 
         如上圖示,MapTask的整個處理流程分五個階段: 
          ●read階段:通過RecordReader從InputSplit分片中將數據解析成一個個key/value。 
          ●map階段:將由RecordReader解析出的key/value交給map()方法處理,並生成一個個新的key/value。 
          ●collect階段:將map()中新生成key/value由OutpCollector.collect()寫入內存中的環形數據緩沖區。 
          ●spill階段:當環形緩沖區達到一定閥值后,會將數據寫到本地磁盤上,生成一個spill文件。在寫文件之前,會先將數據進行一次本地排序,必要的時候(按配置要求)還會對數據進行壓縮。 
          ●combine階段:當所有數據處理完后,將所有的臨時的spill文件進行一次合並,最終之生成一個數據文件。
     
         接下來我們會對該流程中最重要的collect、spill和combine三個階段進行更深入的學習。 
         Collect過程 
         前階段的map中新生成key/value對后,會調用OutpCollector.collect(key,value),在該方法內部,先調用Partitioner.getPartition()獲取該記錄的分區號,然后將<key,value,partition>傳給MapOutputBuffer.collect()作進一步的處理。 
         MapOutputBuffer內部使用了一個內部的環形的緩沖區來暫時保存用戶的輸出數據,當緩沖區使用率達到一定閥值后,由SpillThread線程將緩沖區中的數據spill到本地磁盤上,當所有的數據處理完畢后,對所有的文件進行合並,最終只生成一個文件。該數據緩沖區直接用想到MapTask的寫效率。 
         環形緩沖區使得collect階段和spill階段可以並行處理。 
         MapOutputBuffer內部采用了兩級索引結構,涉及三個環形的內存緩沖區,分別是kvoffsets、kvindices和kvbuffer,這個環形緩沖區的大小可以通過io.sot.mb來設置,默認大小是100MB,圖示如下: 

          kvoffsets即偏移量索引數組,用於保存key/value在kvindices中的偏移量。一個key/value對在kvoffsets數組中占一個int的大小,而在kvindices數組中站3個int的大小(如上圖示,包括分區號partition,key的起始位置和value的起始位置)。 
          當kvoffsets的使用率超過io.sort.spill.percent(默認為80%)后,便會觸發SpillTread線程將數據spill到磁盤上。 
          kvindices即文職索引數組,用於保存實際的key/value在數據緩沖區kvbuffer中的起始位置。 
          kvbuffer即數據局緩沖區,用於實際保存key/value,默認情況下可使用io.sort.mb的95%,當該緩沖區使用率使用率超過io.sort.spill.percent后,便會觸發SpillTread線程將數據spill到磁盤上。 

         Spill過程 
         在collect階段的執行過程中,當內存中的環形數據緩沖區中的數據達到一定發之后,便會觸發一次Spill操作,將部分數據spill到本地磁盤上。SpillThread線程實際上是kvbuffer緩沖區的消費者,主要代碼如下:
Java代碼   收藏代碼
  1. spillLock.lock();  
  2. while(true){  
  3.    spillDone.sinnal();  
  4.    while(kvstart == kvend){  
  5.       spillReady.await();  
  6.    }  
  7.    spillDone.unlock();  
  8.    //排序並將緩沖區kvbuffer中的數據spill到本地磁盤上  
  9.    sortAndSpill();  
  10.    spillLock.lock;  
  11.    //重置各個指針,為下一下spill做准備  
  12.    if(bufend < bufindex && bufindex < bufstart){  
  13.       bufvoid = kvbuffer.length;  
  14.    }  
  15.    vstart = vend;  
  16.    bufstart = bufend;  
  17. }  
  18. spillLock.unlock();  
  19.            

         sortAndSpill()方法中的內部流程是這樣的: 
         第一步,使用用快速排序算法對kvbuffer[bufstart,bufend)中的數據排序,先對partition分區號排序,然后再按照key排序,經過這兩輪排序后,數據就會以分區為單位聚集在一起,且同一分區內的數據按key有序; 
         第二步,按分區大小由小到大依次將每個分區中的數據寫入任務的工作目錄下的臨時文件中 ,如果用戶設置了Combiner,則寫入文件之前,會對每個分區中的數據做一次聚集操作,比如<key1,val1>和<key1,val2>合並成<key1,<val1,val2>>; (不確定是否正確這句話,有的說是在merge時將相同key的value合成list,待我研究下,)
         第三步,將分區數據的元信息寫到內存索引數據結構SpillRecord中。分區的元數據信息包括臨時文件中的偏移量、壓縮前數據的大小和壓縮后數據的大小。 

         Combine過程 
         當任務的所有數據都處理完后,MapTask會將該任務所有的臨時文件年合並成一個大文件,同時生成相應的索引文件。在合並過程中,是以分區文單位進行合並的。 
         讓每個Task最終生成一個文件,可以避免同時打開大量文件和對小文件產生隨機讀帶來的開銷。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM