MapReduce:詳解Shuffle過程


Shuffle過程,也稱Copy階段。reduce task從各個map task上遠程拷貝一片數據,並針對某一片數據,如果其大小超過一定的閥值,則寫到磁盤上,否則直接放到內存中。

官方的Shuffle過程如上圖所示,不過細節有錯亂,官方圖並沒有說明partition、sort和combiner具體作用於哪個階段。

注意:Shuffle過程是貫穿於map和reduce兩個過程的!

Hadoop的集群環境,大部分的map task和reduce task是執行在不同的節點上的,那么reduce就要取map的輸出結果。那么集群中運行多個Job時,task的正常執行會對集群內部的網絡資源消耗嚴重。雖說這種消耗是正常的,是不可避免的,但是,我們可以采取措施盡可能的減少不必要的網絡資源消耗。另一方面,每個節點的內部,相比於內存,磁盤IO對Job完成時間的影響相當的大,。

所以:從以上分析,shuffle過程的基本要求:

  1.完整地從map task端拉取數據到reduce task端

  2.在拉取數據的過程中,盡可能地減少網絡資源的消耗

  3.盡可能地減少磁盤IO對task執行效率的影響

那么,Shuffle的設計目的就要滿足以下條件:

  1.保證拉取數據的完整性

  2.盡可能地減少拉取數據的數據量

  3.盡可能地使用節點的內存而不是磁盤

map端:

說明:

  map節點執行map task任務生成map的輸出結果。

shuffle的工作內容:

  從運算效率的出發點,map輸出結果優先存儲在map節點的內存中。每個map task都有一個內存緩沖區,存儲着map的輸出結果,當緩沖區塊滿時,需要將緩沖區中的數據以一個臨時文件的方式存到磁盤,當整個map task結束后再對磁盤中這個map task所產生的所有臨時文件做合並,生成最終的輸出文件。最后,等待reduce task來拉取數據。當然,如果map task的結果不大,能夠完全存儲到內存緩沖區,且未達到內存緩沖區的閥值,那么就不會有寫臨時文件到磁盤的操作,也不會有后面的合並。

  詳細過程如下:

  1.map task任務執行,輸入數據的來源是:HDFS的block。當然在mapreduce概念中,map task讀取的是split分片。split與block的對應關系:一對一(默認)。

    此處有必要說明一下block與split

     block(物理划分):

            文件上傳到HDFS,就要划分數據成塊,這里的划分屬於物理的划分,塊的大小可配置(默認:第一代為64M,第二代為128M)可通過 dfs.block.size配置。為保證數據的安  全,block采用冗余                      機制:默認為3份,可通過dfs.replication配置。注意:當更改塊大小的配置后,新上傳的文件的塊大小為新配置的值,以前上傳的文件的塊大小為以前的配置值。

         split(邏輯划分):

               Hadoop中split划分屬於邏輯上的划分,目的只是為了讓map task更好地獲取數據。split是通過hadoop中的InputFormat接口中的getSplit()方法得到的。那么,split的大小具體怎么得到呢?

              首先介紹幾個數據量:

                    totalSize:整個mapreduce job所有輸入的總大小。注意:基本單位是block個數,而不是Bytes個數。

                    numSplits:來自job.getNumMapTasks(),即在job啟動時用戶利用 org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設置的值,從方法的名稱上看,是用於設置map的個

                                        數。但是,最終map的個數也就是split的個數並不一定取用戶設置的這個值,用戶設置的map個數值只是給最終的map個數一個提示,只是一個影響因素,而不是決定因素。

                    goalSize:totalSize/numSplits,即期望的split的大小,也就是每個mapper處理多少的數據。但是僅僅是期望

                    minSize:split的最小值,該值可由兩個途徑設置:

                            1.子類復寫函數protected void setMinSplitSize(long minSplitSize)設置。一般情況為1,特殊情況除外

                            2.配置文件中的mapred.min.split.size設置

                最終取兩者中的最大值!

             最終:split大小的計算原則:

               finalSplitSize=max(minSize,min(goalSize,blockSize))

            那么,map的個數=totalSize/finalSplitSize

注意:新版的API中InputSplit划分算法不再考慮用戶設定的Map Task個數,而是用mapred.max.split.size(記為maxSize)代替

            即:InputSplit大小的計算公式為:

                            splitSize=max{minSize,min{maxSize,blockSize}}         

 

     接下來就簡答說說怎么根據業務需求,調整map的個數。當我們用hadoop處理大批量的大數據時,一種最常見的情況就是job啟動的mapper數量太多而超出系統限制,導致hadoop拋出異常終止執行。

               解決方案:減少mapper的數量!具體如下:

         1.輸入文件數量巨大,但不是小文件

          這種情況可通過增大每個mapper的inputsize,即增大minSize或者增大blockSize來減少所需的mapper的數量。增大blocksize通常不可行,因為HDFS被hadoop namenode -format之后,

                      blocksize就已經確定了(由格式化時dfs.block.size決定),如果要更改blocksize,需要重新格式化HDFS,這樣當然會丟失已有的數據。所以通常情況下只能增大minSize,即增大mapred.min.

                 split.size的值。

                     2.輸入文件數量巨大,且都是小文件

       所謂小文件,就是單個文件的size小於blockSize。這種情況通過增大mapred.min.split.size不可行,需要使用FileInputFormat衍生的CombineFileInputFormat將多個input path合並成一個

                     InputSplit送給mapper處理,從而減少mapper的數量。

                 增加mapper的數量,可以通過減少每個mapper的輸入做到,即減小blockSize或者減少mapred.min.split.size的值。

                 block與split關系說清楚了,那先說到這里,還是回到shuffle的過程解說中來!

         2.  map執行后,得到key/value鍵值對。接下來的問題就是,這些鍵值對應該交給哪個reduce做?注意:reduce的個數是允許用戶在提交job時,通過設置方法設置的!

    MapReduce提供partitioner接口解決上述問題。默認操作是:對key hash后再以reduce task數量取模,返回值決定着該鍵值對應該由哪個reduce處理。

           這種默認的取模方式只是為了平均reduce的處理能力,防止數據傾斜,保證負載均衡。

           如果用戶自己對Partition有需求,可以自行定制並設置到job上。

           接下來,需要將key/value以及Partition結果都寫入到緩沖區,緩沖區的作用:批量收集map結果,減少磁盤IO的影響。

           當然,寫入之前,這些數據都會被序列化成字節數組。而整個內存緩沖區就是一個字節數組。

           這個內存緩沖區是有大小限制的,默認100MB。當map task的輸出結果很多時,就可能撐爆內存。需將緩沖區的數據臨時寫入磁盤,然后重新利用這塊緩沖區。

           從內存往磁盤寫數據被稱為Spill(溢寫),由單獨線程完成,不影響往緩沖區寫map結果的線程。溢寫比例:spill.percent(默認0.8)。

           當緩沖區的數據達到閥值,溢寫線程啟動,鎖定這80MB的內存,執行溢寫過程。剩下的20MB繼續寫入map task的輸出結果。互不干涉!

           當溢寫線程啟動后,需要對這80MB空間內的key做排序(Sort)。排序是mapreduce模型的默認行為,也是對序列化的字節做的排序。排序規則:字典排序!

           map task的輸出結果寫入內存后,當溢寫線程未啟動時,對輸出結果並沒有做任何的合並。從官方圖可以看出,合並是體現在溢寫的臨時磁盤文件上的,且這種合並是對不同的

           reduce端的數值做的合並。所以溢寫過程一個很重要的細節在於,如果有很多個key/value對需要發送到某個reduce端,那么需要將這些鍵值對拼接到一塊,減少與partition相

           關的索引記錄。如果client設置過Combiner,其會將有相同key的key/value對的value加起來,減少溢寫到磁盤的數據量。注意:這里的合並並不能保證map結果中所有的相同

           的key值的鍵值對的value都合並了,它合並的范圍只是這80MB,它能保證的是在每個單獨的溢寫文件中所有鍵值對的key值均不相同!

              溢寫生成的臨時文件的個數隨着map輸出結果的數據量變大而增多,當整個map task完成,內存中的數據也全部溢寫到磁盤的一個溢寫文件。

           也就是說,不論任何情況下,溢寫過程生成的溢寫文件至少有一個!但是最終的文件只能有一個,需要將這些溢寫文件歸並到一起,稱為merge。

           merge是將所有的溢寫文件歸並到一個文件,結合上面所描述的combiner的作用范圍,歸並得到的文件內鍵值對有可能擁有相同的key,這個過程如果client設置過

           Combiner,也會合並相同的key值的鍵值對,如果沒有,merge得到的就是鍵值集合,如{“aaa”, [5, 8, 2, …]}

           注意:combiner的合理設置可以提高效率,但是如果使用不當會影響效率!

  3至此,map端的所有工作都已經結束!

Reduce端:

  當mapreduce任務提交后,reduce task就不斷通過RPC從JobTracker那里獲取map task是否完成的信息,如果獲知某台TaskTracker上的map task執行完成,Shuffle的后半段過程就開始啟動。其實呢,reduce task在執行之前的工作就是:不斷地拉取當前job里每個map task的最終結果,並對不同地方拉取過來的數據不斷地做merge,也最終形成一個文件作為reduce task的輸入文件。

  

      1.Copy過程,簡單地拉取數據。Reduce進程啟動一些數據copy線程(Fether),通過HTTP方式請求map task所在的TaskTracker獲取map task的輸出文件。因為map task早已結束,這些文件就歸TaskTracker管理在本地磁盤。

  2.Merge過程。這里的merge如map端的merge動作,只是數組中存放的是不同map端copy過來的數值。Copy過來的數據會先放入內存緩沖區中,這里緩沖區的大小要比map端的更為靈活,它是基於JVM的heap size設置,因為shuffler階段reducer不運行,所以應該把絕大部分的內存都給shuffle用。

  merge的三種形式:

      內存到內存、內存到磁盤、磁盤到磁盤

  默認情況下,第一種形式不啟用。當內存中的數據量達到一定的閥值,就啟動內存到磁盤的merge。與map端類似,這也是溢寫過程,當然如果這里設置了Combiner,也是會啟動的,然后在磁盤中生成了眾多的溢寫文件。第二種merge方式一直在運行,直到沒有map端的數據時才結束,然后啟動第三種磁盤到磁盤的merge方式生成最終的那個文件。

  3.reducer的輸入文件。不斷地merge后,最后會生成一個“最終文件”。這個最終文件可能在磁盤中也可能在內存中。當然我們希望它在內存中,直接作為reducer的輸入,但默認情況下,這個文件是存放於磁盤中的。當reducer的輸入文件已定,整個shuffle才最終結束。然后就是reducer執行,把結果存放到HDFS上。

感謝博主:http://langyu.iteye.com/blog/992916


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM