Shuffle簡介
Shuffle的本意是洗牌、混洗的意思,把一組有規則的數據盡量打亂成無規則的數據。而在MapReduce中,Shuffle更像是洗牌的逆過程,指的是將map端的無規則輸出按指定的規則“打亂”成具有一定規則的數據,以便reduce端接收處理。其在MapReduce中所處的工作階段是map輸出后到reduce接收前,具體可以分為map端和reduce端前后兩個部分。
在shuffle之前,也就是在map階段,MapReduce會對要處理的數據進行分片(split)操作,為每一個分片分配一個MapTask任務。接下來map會對每一個分片中的每一行數據進行處理得到鍵值對(key,value)此時得到的鍵值對又叫做“中間結果”。此后便進入reduce階段,由此可以看出Shuffle階段的作用是處理“中間結果”。
由於Shuffle涉及到了磁盤的讀寫和網絡的傳輸,因此Shuffle性能的高低直接影響到了整個程序的運行效率。
MapReduce Shuffle
Hadoop的核心思想是MapReduce,但shuffle又是MapReduce的核心。shuffle的主要工作是從Map結束到Reduce開始之間的過程。shuffle階段又可以分為Map端的shuffle和Reduce端的shuffle。
Map端的shuffle
下圖是MapReduce Shuffle的官方流程:

因為頻繁的磁盤I/O操作會嚴重的降低效率,因此“中間結果”不會立馬寫入磁盤,而是優先存儲到map節點的“環形內存緩沖區”,在寫入的過程中進行分區(partition),也就是對於每個鍵值對來說,都增加了一個partition屬性值,然后連同鍵值對一起序列化成字節數組寫入到緩沖區(緩沖區采用的就是字節數組,默認大小為100M)。
當寫入的數據量達到預先設置的闕值后便會啟動溢寫出線程將緩沖區中的那部分數據溢出寫(spill)到磁盤的臨時文件中,並在寫入前根據key進行排序(sort)和合並(combine,可選操作)。
溢出寫過程按輪詢方式將緩沖區中的內容寫到mapreduce.cluster.local.dir屬性指定的本地目錄中。當整個map任務完成溢出寫后,會對磁盤中這個map任務產生的所有臨時文件(spill文件)進行歸並(merge)操作生成最終的正式輸出文件,此時的歸並是將所有spill文件中的相同partition合並到一起,並對各個partition中的數據再進行一次排序(sort),生成key和對應的value-list,文件歸並時,如果溢寫文件數量超過參數min.num.spills.for.combine的值(默認為3)時,可以再次進行合並。
至此map端的工作已經全部結束,最終生成的文件也會存儲在TaskTracker能夠訪問的位置。每個reduce task不間斷的通過RPC從JobTracker那里獲取map task是否完成的信息,如果得到的信息是map task已經完成,那么Shuffle的后半段開始啟動。
Reduce端的shuffle
當mapreduce任務提交后,reduce task就不斷通過RPC從JobTracker那里獲取map task是否完成的信息,如果獲知某台TaskTracker上的map task執行完成,Shuffle的后半段過程就開始啟動。Reduce端的shuffle主要包括三個階段,copy、merge和reduce。

每個reduce task負責處理一個分區的文件,以下是reduce task的處理流程:
-
reduce task從每個map task的結果文件中拉取對應分區的數據。因為數據在map階段已經是分好區了,並且會有一個額外的索引文件記錄每個分區的起始偏移量。所以reduce task取數的時候直接根據偏移量去拉取數據就ok。
-
reduce task從每個map task拉取分區數據的時候會進行再次合並,排序,按照自定義的reducer的邏輯代碼去處理。
-
最后就是Reduce過程了,在這個過程中產生了最終的輸出結果,並將其寫到HDFS上。
為什么要排序
-
key存在combine操作,排序之后相同的key放到一塊顯然方便做合並操作。
-
reduce task是按key去處理數據的。 如果沒有排序那必須從所有數據中把當前相同key的所有value數據拿出來,然后進行reduce邏輯處理。顯然每個key到這個邏輯都需要做一次全量數據掃描,影響性能,有了排序很方便的得到一個key對於的value集合。
-
reduce task按key去處理數據時,如果key按順序排序,那么reduce task就按key順序去讀取,顯然當讀到的key是文件末尾的key那么就標志數據處理完畢。如果沒有排序那還得有其他邏輯來記錄哪些key處理完了,哪些key沒有處理完。
雖有千萬種理由需要這么做,但是很耗資源,並且像排序其實我們有些業務並不需要排序。
為什么要文件合並
-
因為內存放不下就會溢寫文件,就會發生多次溢寫,形成很多小文件,如果不合並,顯然會小文件泛濫,集群需要資源開銷去管理這些小文件數據。
-
任務去讀取文件的數增多,打開的文件句柄數也會增多。
-
mapreduce是全局有序。單個文件有序,不代表全局有序,只有把小文件合並一起排序才會全局有序。
Spark的Shuffle
Spark的Shuffle是在MapReduce Shuffle基礎上進行的調優。其實就是對排序、合並邏輯做了一些優化。在Spark中Shuffle write相當於MapReduce 的map,Shuffle read相當於MapReduce 的reduce。
Spark豐富了任務類型,有些任務之間數據流轉不需要通過Shuffle,但是有些任務之間還是需要通過Shuffle來傳遞數據,比如寬依賴的group by key以及各種by key算子。寬依賴之間會划分stage,而Stage之間就是Shuffle,如下圖中的stage0,stage1和stage3之間就會產生Shuffle。

在Spark的中,負責shuffle過程的執行、計算和處理的組件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager隨着Spark的發展有兩種實現的方式,分別為HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。
Spark Shuffle發展史
Spark 0.8及以前 Hash Based Shuffle
Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機制
Spark 0.9 引入ExternalAppendOnlyMap
Spark 1.1 引入Sort Based Shuffle,但默認仍為Hash Based Shuffle
Spark 1.2 默認的Shuffle方式改為Sort Based Shuffle
Spark 1.4 引入Tungsten-Sort Based Shuffle
Spark 1.6 Tungsten-sort並入Sort Based Shuffle
Spark 2.0 Hash Based Shuffle退出歷史舞台
在Spark的版本的發展,ShuffleManager在不斷迭代,變得越來越先進。
在Spark 1.2以前,默認的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有着一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。
SortShuffleManager相較於HashShuffleManager來說,有了一定的改進。主要就在於,每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最后會將所有的臨時文件合並(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。
Hash Shuffle
HashShuffleManager的運行機制主要分成兩種,一種是普通運行機制,另一種是合並的運行機制。合並機制主要是通過復用buffer來優化Shuffle過程中產生的小文件的數量。Hash shuffle是不具有排序的Shuffle。
普通機制的Hash Shuffle
最開始使用的Hash Based Shuffle,每個Mapper會根據Reducer的數量創建對應的bucket,bucket的數量是M * R,M是map的數量,R是Reduce的數量。
如下圖所示:2個core 4個map task 3 個reduce task,會產生4*3=12個小文件。

優化后的Hash Shuffle
普通機制Hash Shuffle會產生大量的小文件(M * R),對文件系統的壓力也很大,也不利於IO的吞吐量,后來做了優化(設置spark.shuffle.consolidateFiles=true開啟,默認false),把在同一個core上的多個Mapper輸出到同一個文件,這樣文件數就變成core * R 個了。
如下圖所示:2個core 4個map task 3 個reduce task,會產生2*3=6個小文件。

Hash shuffle合並機制的問題:
如果 Reducer 端的並行任務或者是數據分片過多的話則 Core * Reducer Task 依舊過大,也會產生很多小文件。進而引出了更優化的sort shuffle。
在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。
Sort Shuffle
SortShuffleManager的運行機制主要分成兩種,一種是普通運行機制,另一種是bypass運行機制。當shuffle read task的數量小於等於spark.shuffle.sort.bypassMergeThreshold參數的值時(默認為200),就會啟用bypass機制。
普通機制的Sort Shuffle
這種機制和mapreduce差不多,在該模式下,數據會先寫入一個內存數據結構中,此時根據不同的shuffle算子,可能選用不同的數據結構。如果是reduceByKey這種聚合類的shuffle算子,那么會選用Map數據結構,一邊通過Map進行聚合,一邊寫入內存;如果是join這種普通的shuffle算子,那么會選用Array數據結構,直接寫入內存。接着,每寫一條數據進入內存數據結構之后,就會判斷一下,是否達到了某個臨界閾值。如果達到臨界閾值的話,那么就會嘗試將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。

在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序。排序過后,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批1萬條數據的形式分批寫入磁盤文件。
一個task將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,也會產生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合並,由於一個task就只對應一個磁盤文件因此還會單獨寫一份索引文件,其中標識了下游各個task的數據在文件中的start offset與end offset。
SortShuffleManager由於有一個磁盤文件merge的過程,因此大大減少了文件數量,由於每個task最終只有一個磁盤文件所以文件個數等於上游shuffle write個數。
bypass機制的Sort Shuffle
bypass運行機制的觸發條件如下:
1)shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值,默認值200。
2)不是聚合類的shuffle算子(比如reduceByKey)。
此時task會為每個reduce端的task都創建一個臨時磁盤文件,並將數據按key進行hash然后根據key的hash值,將key寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合並成一個磁盤文件,並創建一個單獨的索引文件。
該過程的磁盤寫機制其實跟未經優化的HashShuffleManager是一模一樣的,因為都要創建數量驚人的磁盤文件,只是在最后會做一個磁盤文件的合並而已。因此少量的最終磁盤文件,也讓該機制相對未經優化的HashShuffleManager來說,shuffle read的性能會更好。
而該機制與普通SortShuffleManager運行機制的不同在於:
第一,磁盤寫機制不同;
第二,不會進行排序。也就是說,啟用該機制的最大好處在於,shuffle write過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。
Spark Shuffle總結
Shuffle 過程本質上都是將 Map 端獲得的數據使用分區器進行划分,並將數據發送給對應的 Reducer 的過程。
Shuffle作為處理連接map端和reduce端的樞紐,其shuffle的性能高低直接影響了整個程序的性能和吞吐量。map端的shuffle一般為shuffle的Write階段,reduce端的shuffle一般為shuffle的read階段。Hadoop和spark的shuffle在實現上面存在很大的不同,spark的shuffle分為兩種實現,分別為HashShuffle和SortShuffle。
HashShuffle又分為普通機制和合並機制,普通機制因為其會產生MR個數的巨量磁盤小文件而產生大量性能低下的Io操作,從而性能較低,因為其巨量的磁盤小文件還可能導致OOM,HashShuffle的合並機制通過重復利用buffer從而將磁盤小文件的數量降低到CoreR個,但是當Reducer 端的並行任務或者是數據分片過多的時候,依然會產生大量的磁盤小文件。
SortShuffle也分為普通機制和bypass機制,普通機制在內存數據結構(默認為5M)完成排序,會產生2M個磁盤小文件。而當shuffle map task數量小於spark.shuffle.sort.bypassMergeThreshold參數的值。或者算子不是聚合類的shuffle算子(比如reduceByKey)的時候會觸發SortShuffle的bypass機制,SortShuffle的bypass機制不會進行排序,極大的提高了其性能。
在Spark 1.2以前,默認的shuffle計算引擎是HashShuffleManager,因為HashShuffleManager會產生大量的磁盤小文件而性能低下,在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。
SortShuffleManager相較於HashShuffleManager來說,有了一定的改進。主要就在於,每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最后會將所有的臨時文件合並(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。
Spark與MapReduce Shuffle的異同
-
從整體功能上看,兩者並沒有大的差別。 都是將 mapper(Spark 里是 ShuffleMapTask)的輸出進行 partition,不同的 partition 送到不同的 reducer(Spark 里 reducer 可能是下一個 stage 里的 ShuffleMapTask,也可能是 ResultTask)。Reducer 以內存作緩沖區,邊 shuffle 邊 aggregate 數據,等到數據 aggregate 好以后進行 reduce(Spark 里可能是后續的一系列操作)。
-
從流程的上看,兩者差別不小。 Hadoop MapReduce 是 sort-based,進入 combine和 reduce的 records 必須先 sort。這樣的好處在於 combine/reduce可以處理大規模的數據,因為其輸入數據可以通過外排得到(mapper 對每段數據先做排序,reducer 的 shuffle 對排好序的每段數據做歸並)。以前 Spark 默認選擇的是 hash-based,通常使用 HashMap 來對 shuffle 來的數據進行合並,不會對數據進行提前排序。如果用戶需要經過排序的數據,那么需要自己調用類似 sortByKey的操作。在Spark 1.2之后,sort-based變為默認的Shuffle實現。
-
從流程實現角度來看,兩者也有不少差別。 Hadoop MapReduce 將處理流程划分出明顯的幾個階段:map, spill, merge, shuffle, sort, reduce等。每個階段各司其職,可以按照過程式的編程思想來逐一實現每個階段的功能。在 Spark 中,沒有這樣功能明確的階段,只有不同的 stage 和一系列的 transformation,所以 spill, merge, aggregate 等操作需要蘊含在 transformation中。
