MapReduce的shuffle過程詳解


shuffle概念
  shuffle的本意是洗牌、混洗的意思,把一組有規則的數據盡量打亂成無規則的數據。而在MapReduce中,shuffle更像是洗牌的逆過程,指的是將map端的無規則輸出按指定的規則“打亂”成具有一定規則的數據,以便reduce端接收處理。其在MapReduce中所處的工作階段是map輸出后到reduce接收前,具體可以分為map端和reduce端前后兩個部分。在shuffle之前,也就是在map階段,MapReduce會對要處理的數據進行分片(split)操作,為每一個分片分配一個MapTask任務。接下來map()函數會對每一個分片中的每一行數據進行處理得到鍵值對(key,value),其中key為偏移量,value為一行的內容。此時得到的鍵值對又叫做“中間結果”。此后便進入shuffle階段,由此可以看出shuffle階段的作用是處理“中間結果”。

此處應該想一下,為什么需要shuffle,它的作用是什么?

在了解shuffle的具體流程之前,應先對以下兩個概念有所了解:

block塊(物理划分)
  block是HDFS中的基本存儲單位,hadoop1.x默認大小為64M而hadoop2.x默認塊大小為128M。文件上傳到HDFS,就要划分數據成塊,這里的划分屬於物理的划分(實現機制也就是設置一個read方法,每次限制最多讀128M的數據后調用write進行寫入到hdfs),塊的大小可通過 dfs.block.size配置。block采用冗余機制保證數據的安全:默認為3份,可通過dfs.replication配置。注意:當更改塊大小的配置后,新上傳的文件的塊大小為新配置的值,以前上傳的文件的塊大小為以前的配置值。

split分片(邏輯划分)
  Hadoop中split划分屬於邏輯上的划分,目的只是為了讓map task更好地獲取數據。split是通過hadoop中的InputFormat接口中的getSplit()方法得到的。那么,split的大小具體怎么得到呢?

首先介紹幾個數據量:

  totalSize:整個mapreduce job輸入文件的總大小。

  numSplits:來自job.getNumMapTasks(),即在job啟動時用戶利用 org.apache.hadoop.mapred.JobConf.setNumMapTasks(int n)設置的值,從方法的名稱上看,是用於設置map的個數。但是,最終map的個數也就是split的個數並不一定取用戶設置的這個值,用戶設置的map個數值只是給最終的map個數一個提示,只是一個影響因素,而不是決定因素。

  goalSize:totalSize/numSplits,即期望的split的大小,也就是每個mapper處理多少的數據。但也僅僅是期望。

  minSize:split的最小值,該值可由兩個途徑設置:

     1.通過子類重寫方法protected void setMinSplitSize(long minSplitSize)進行設置。一般情況為1,特殊情況除外

     2.通過配置文件中的mapred.min.split.size進行設置

     最終取兩者中的最大值!

  split計算公式:finalSplitSize=max(minSize,min(goalSize,blockSize))

 

shuffle流程概括:
  因為頻繁的磁盤I/O操作會嚴重的降低效率,因此“中間結果”不會立馬寫入磁盤,而是優先存儲到map節點的“環形內存緩沖區”,在寫入的過程中進行分區(partition),也就是對於每個鍵值對來說,都增加了一個partition屬性值,然后連同鍵值對一起序列化成字節數組寫入到緩沖區(緩沖區采用的就是字節數組,默認大小為100M)。當寫入的數據量達到預先設置的闕值后(mapreduce.map.io.sort.spill.percent,默認0.80,或者80%)便會啟動溢寫出線程將緩沖區中的那部分數據溢出寫(spill)到磁盤的臨時文件中,並在寫入前根據key進行排序(sort)和合並(combine,可選操作)。溢出寫過程按輪詢方式將緩沖區中的內容寫到mapreduce.cluster.local.dir屬性指定的目錄中。當整個map任務完成溢出寫后,會對磁盤中這個map任務產生的所有臨時文件(spill文件)進行歸並(merge)操作生成最終的正式輸出文件,此時的歸並是將所有spill文件中的相同partition合並到一起,並對各個partition中的數據再進行一次排序(sort),生成key和對應的value-list,文件歸並時,如果溢寫文件數量超過參數min.num.spills.for.combine的值(默認為3)時,可以再次進行合並。至此,map端shuffle過程結束,接下來等待reduce task來拉取數據。對於reduce端的shuffle過程來說,reduce task在執行之前的工作就是不斷地拉取當前job里每個map task的最終結果,然后對從不同地方拉取過來的數據不斷地做merge最后合並成一個分區相同的大文件,然后對這個文件中的鍵值對按照key進行sort排序,排好序之后緊接着進行分組,分組完成后才將整個文件交給reduce task處理。

  糾正:分區好像是發生在溢出寫過程之前,也就是當滿足溢出寫條件時,首先進行分區,然后分區內排序,並且選擇性的combine,最后寫出到磁盤。

 

下圖是shuffle的官方流程圖:

 

 

 

  結合下面三張圖可以清楚地理解shuffle過程

 

 

 

 

 

 

 

 

  shuffle詳細流程
  Map端shuffle
  ①分區partition

  ②寫入環形內存緩沖區

  ③執行溢出寫

        排序sort--->合並combiner--->生成溢出寫文件

  ④歸並merge

        

① 分區Partition

在將map()函數處理后得到的(key,value)對寫入到緩沖區之前,需要先進行分區操作,這樣就能把map任務處理的結果發送給指定的reducer去執行,從而達到負載均衡,避免數據傾斜。MapReduce提供默認的分區類(HashPartitioner),其核心代碼如下:

public class HashPartitioner<K, V> extends Partitioner<K, V> {

/** Use {@link Object#hashCode()} to partition. */
  public int getPartition(K key, V value,
    int numReduceTasks) {
      return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks;
    }

  }
 
getPartition()方法有三個參數,前兩個指的是mapper任務輸出的鍵值對,而第三個參數指的是設置的reduce任務的數量,默認值為1。因為任何整數與1相除的余數肯定是0。也就是說默認的getPartition()方法的返回值總是0,也就是Mapper任務的輸出默認總是送給同一個Reducer任務,最終只能輸出到一個文件中。如果想要讓mapper輸出的結果給多個reducer處理,那么只需要寫一個類,讓其繼承Partitioner類,並重寫getPartition()方法,讓其針對不同情況返回不同數值即可。並在最后通過job設置指定分區類和reducer任務數量即可。

②寫入環形內存緩沖區

因為頻繁的磁盤I/O操作會嚴重的降低效率,因此“中間結果”不會立馬寫入磁盤,而是優先存儲到map節點的“環形內存緩沖區”,並做一些預排序以提高效率,當寫入的數據量達到預先設置的闕值后便會執行一次I/O操作將數據寫入到磁盤。每個map任務都會分配一個環形內存緩沖區,用於存儲map任務輸出的鍵值對(默認大小100MB,mapreduce.task.io.sort.mb調整)以及對應的partition,被緩沖的(key,value)對已經被序列化(為了寫入磁盤)。

③執行溢寫出

一旦緩沖區內容達到閾值(mapreduce.map.io.sort.spill.percent,默認0.80,或者80%),就會會鎖定這80%的內存,並在每個分區中對其中的鍵值對按鍵進行sort排序,具體是將數據按照partition和key兩個關鍵字進行排序,排序結果為緩沖區內的數據按照partition為單位聚集在一起,同一個partition內的數據按照key有序。排序完成后會創建一個溢出寫文件(臨時文件),然后開啟一個后台線程把這部分數據以一個臨時文件的方式溢出寫(spill)到本地磁盤中(如果客戶端自定義了Combiner(相當於map階段的reduce),則會在分區排序后到溢寫出前自動調用combiner,將相同的key的value相加,這樣的好處就是減少溢寫到磁盤的數據量。這個過程叫“合並”)。剩余的20%的內存在此期間可以繼續寫入map輸出的鍵值對。溢出寫過程按輪詢方式將緩沖區中的內容寫到mapreduce.cluster.local.dir屬性指定的目錄中。

合並Combiner
如果指定了Combiner,可能在兩個地方被調用: 
1.當為作業設置Combiner類后,緩存溢出線程將緩存存放到磁盤時,就會調用; 
2.緩存溢出的數量超過mapreduce.map.combine.minspills(默認3)時,在緩存溢出文件合並的時候會調用

合並(Combine)和歸並(Merge)的區別: 
兩個鍵值對<“a”,1>和<“a”,1>,如果合並,會得到<“a”,2>,如果歸並,會得到<“a”,<1,1>>

特殊情況:當數據量很小,達不到緩沖區闕值時,怎么處理?

對於這種情況,目前看到有兩種不一樣的說法:

       ①不會有寫臨時文件到磁盤的操作,也不會有后面的合並。

       ②最終也會以臨時文件的形式存儲到本地磁盤

至於真實情況是怎么樣的,我還不清楚。。。

④歸並merge

當一個map task處理的數據很大,以至於超過緩沖區內存時,就會生成多個spill文件。此時就需要對同一個map任務產生的多個spill文件進行歸並生成最終的一個已分區且已排序的大文件。配置屬性mapreduce.task.io.sort.factor控制着一次最多能合並多少流,默認值是10。這個過程包括排序和合並(可選),歸並得到的文件內鍵值對有可能擁有相同的key,這個過程如果client設置過Combiner,也會合並相同的key值的鍵值對(根據上面提到的combine的調用時機可知)。

溢出寫文件歸並完畢后,Map將刪除所有的臨時溢出寫文件,並告知NodeManager任務已完成,只要其中一個MapTask完成,ReduceTask就開始復制它的輸出(Copy階段分區輸出文件通過http的方式提供給reducer)

壓縮 
寫磁盤時壓縮map端的輸出,因為這樣會讓寫磁盤的速度更快,節約磁盤空間,並減少傳給reducer的數據量。默認情況下,輸出是不壓縮的(將mapreduce.map.output.compress設置為true即可啟動)

Reduce端shuffle
①復制copy

②歸並merge

③reduce

 

結合下面這張圖可以直觀感受reduce端的shuffle過程

 

 

 

①復制copy

Reduce進程啟動一些數據copy線程,通過HTTP方式請求MapTask所在的NodeManager以獲取輸出文件。 
NodeManager需要為分區文件運行reduce任務。並且reduce任務需要集群上若干個map任務的map輸出作為其特殊的分區文件。而每個map任務的完成時間可能不同,因此只要有一個任務完成,reduce任務就開始復制其輸出。

reduce任務有少量復制線程,因此能夠並行取得map輸出。默認線程數為5,但這個默認值可以通過mapreduce.reduce.shuffle.parallelcopies屬性進行設置。

【Reducer如何知道自己應該處理哪些數據呢?】 
因為Map端進行partition的時候,實際上就相當於指定了每個Reducer要處理的數據(partition就對應了Reducer),所以Reducer在拷貝數據的時候只需拷貝與自己對應的partition中的數據即可。每個Reducer會處理一個或者多個partition。

【reducer如何知道要從哪台機器上去的map輸出呢?】 
map任務完成后,它們會使用心跳機制通知它們的application master、因此對於指定作業,application master知道map輸出和主機位置之間的映射關系。reducer中的一個線程定期詢問master以便獲取map輸出主機的位置。知道獲得所有輸出位置。

②歸並merge

 Copy 過來的數據會先放入內存緩沖區中,這里的緩沖區大小要比 map 端的更為靈活,它基於 JVM 的 heap size 設置,因為 Shuffle 階段 Reducer 不運行,所以應該把絕大部分的內存都給 Shuffle 用。

Copy過來的數據會先放入內存緩沖區中,如果內存緩沖區中能放得下這次數據的話就直接把數據寫到內存中,即內存到內存merge。Reduce要向每個Map去拖取數據,在內存中每個Map對應一塊數據,當內存緩存區中存儲的Map數據占用空間達到一定程度的時候,開始啟動內存中merge,把內存中的數據merge輸出到磁盤上一個文件中,即內存到磁盤merge。與map端的溢寫類似,在將buffer中多個map輸出合並寫入磁盤之前,如果設置了Combiner,則會化簡壓縮合並的map輸出。Reduce的內存緩沖區可通過mapred.job.shuffle.input.buffer.percent配置,默認是JVM的heap size的70%。內存到磁盤merge的啟動門限可以通過mapred.job.shuffle.merge.percent配置,默認是66%。

當屬於該reducer的map輸出全部拷貝完成,則會在reducer上生成多個文件(如果拖取的所有map數據總量都沒有內存緩沖區,則數據就只存在於內存中),這時開始執行合並操作,即磁盤到磁盤merge,Map的輸出數據已經是有序的,Merge進行一次合並排序,所謂Reduce端的sort過程就是這個合並的過程,采取的排序方法跟map階段不同,因為每個map端傳過來的數據是排好序的,因此眾多排好序的map輸出文件在reduce端進行合並時采用的是歸並排序,針對鍵進行歸並排序。一般Reduce是一邊copy一邊sort,即copy和sort兩個階段是重疊而不是完全分開的。最終Reduce shuffle過程會輸出一個整體有序的數據塊。

③reduce

當一個reduce任務完成全部的復制和排序后,就會針對已根據鍵排好序的Key構造對應的Value迭代器。這時就要用到分組,默認的根據鍵分組,自定義的可是使用 job.setGroupingComparatorClass()方法設置分組函數類。對於默認分組來說,只要這個比較器比較的兩個Key相同,它們就屬於同一組,它們的 Value就會放在一個Value迭代器,而這個迭代器的Key使用屬於同一個組的所有Key的第一個Key。

在reduce階段,reduce()方法的輸入是所有的Key和它的Value迭代器。此階段的輸出直接寫到輸出文件系統,一般為HDFS。如果采用HDFS,由於NodeManager也運行數據節點,所以第一個塊副本將被寫到本地磁盤。

1、當reduce將所有的map上對應自己partition的數據下載完成后,reducetask真正進入reduce函數的計算階段。由於reduce計算時同樣是需要內存作為buffer,可以用mapreduce.reduce.input.buffer.percent(default 0.0)(源代碼MergeManagerImpl.java:674行)來設置reduce的緩存。

這個參數默認情況下為0,也就是說,reduce是全部從磁盤開始讀處理數據。如果這個參數大於0,那么就會有一定量的數據被緩存在內存並輸送給reduce,當reduce計算邏輯消耗內存很小時,可以分一部分內存用來緩存數據,可以提升計算的速度。所以默認情況下都是從磁盤讀取數據,如果內存足夠大的話,務必設置該參數讓reduce直接從緩存讀數據,這樣做就有點Spark Cache的感覺。

2、Reduce在這個階段,框架為已分組的輸入數據中的每個鍵值對對調用一次 reduce(WritableComparable,Iterator, OutputCollector, Reporter)方法。Reduce任務的輸出通常是通過調用 OutputCollector.collect(WritableComparable,Writable)寫入文件系統的。

 

關於分組的深入理解,請看這篇文章:https://mp.csdn.net/postedit/81778972


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM