介紹
不論MapReduce還是RDD,shuffle都是非常重要的一環,也是影響整個程序執行效率的主要環節,但是在這兩個編程模型里面shuffle卻有很大的異同。
shuffle的目的是對數據進行混洗,將各個節點的同一類數據匯集到某一個節點進行計算,為了就是分布式計算的可擴展性。
可能大家多MR的shuffle比較清楚,相對來說MR的shuffle是比較清晰和粗暴的。shuffle階段是介於Map和Reduce的一個中間階段。

具體詳情見:
高威:MapReduce編程模型而Spark的shuffle過程時出現在ShuffleMapTask過程中,和MR的map端shuffle以及reduce端shuffle類似,spark由於是一條鏈路不落盤的RDD開發模式,所以Spark的shuffle分為shuffle的讀操作和shuffle的寫操作。
區別
MR的shuffle分為:
- Map端的shuffle,主要是Partition、Collector、Sort、Spill、Merge幾個階段;
- Reduce端的shuffle,主要是Copy、Merge、Reduce幾個階段。
但是MR的shuffle有一個很重要的特點:全局排序。
MR的shuffle過程中在Map端會進行一個Sort,也會在Reduce端對Map的結果在進行一次排序。這樣子最后就變成了有多個溢出文件(單個溢出文件是有序的,但是整體上是無序的),那么最后在merge成一個輸出文件時還需再排序一次,同時,reduce在進行merge的時候同樣需要再次排序(因為它從多個map處拉數據)
注意:這個有序是指Key值有序,對於value依舊是無序的,如果想對value進行排序,需要借鑒二次排序的算法。
二次排序的理論是利用MR的全局排序的功能,將value和key值合並,作為一個新的Key值,然后由MR的機制進行Key的排序,這個方法類似於在處理數據傾斜的時候在Key值上加隨機數的方法。
這個是排序的一種思想----合並排序。先進行小范圍排序,最后再大范圍排序。最后的復雜度為O(nlog(n)),比普通排序復雜度O(n的平方)快。
但是問題是,排序是一個很耗資源的一種操作,而且很多的業務場景,是不需要進行排序的。所以MR的全局排序在很多的業務場景中是一個非常耗資源而且無用的操作。
Spark的shuffle(對排序和合並進行了優化):
為了避免不必要的排序,Spark提供了基於Hash的、基於排序和自定義的shuffleManager操作。
1、shuffle的讀寫操作
- 基於Hash的shuffle的寫操作(一種是普通運行機制,另一種是合並的運行機制)。

- Mapper根據Reduce的數量創建相應的bucket(類似於MR的分區),bucket是一個抽象的概念,數量為M*R;
- Mapper生成的結果會根據設置的partition算法填充到每個bucket中;
- Reduce啟動的時候,Mapper優先從本地或者然后遠端(數據的本地性)取相應的bucket作為Reduce的輸入進行處理
- Spark不在reduce端不做merge和sort,而是使用聚合(aggregator)。聚合是一個HashMap,將shuffle讀到的每一個健值對更新或者插入到HashMap中,這樣就不需要把所有的健值對進行merge sort,而是來一個插入一個。
基於Hash的shuffle會出現一個問題:文件個數是M*R個,對於文件系統是一個負擔,同時在shuffle的數據量不大的情況下,文件個數過多,隨機寫入會嚴重降低I/O的性能。同時如果后續任務很大的情況下,這些小文件所占用的緩存也是一個很大的開銷。
后續HashShuffle優化合並的運行機制,避免大量文件產生,把同一個core上的多個Mapper文件輸出到同一個文件里面,這樣文件個數就變成了R。
在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。
- 基於排序的shuffle的讀寫操作(spill和index)。
根據不同的shuffle算子(是否combine),可選擇不同的數據結構。
如果不需要combine,會選擇Array數據結構,直接寫到內存,然后溢寫到磁盤;
如果需要combine,則選擇Map數據結構,一邊對Map進行排序聚合,一邊寫到內存,然后溢寫到磁盤,最后在合並聚合。

- ShuffleMapTask不在為每個任務創建單獨的文件,而是將所有的結果寫到同一個文件中;
- 生成index文件進行索引,通過索引避免大量的文件的產生。
- 判斷在Map端是否進行合並(combine),例如sortByKey等是需要排序的,但是HashMap是不會進行排序的,如果需要合並則在Map端進行聚合排序,如果不需要則HashMap;
- 排序中的Map內存超過域值,則溢寫到磁盤。當所有的數據處完后,通過merge將內存和磁盤上的文件進行合並。
- 在下一個調度階段,shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。
SortShuffle數據會先寫入一個內存數據結構中,此時根據不同的shuffle算子,可以選用不同的數據結構。如果是有聚合操作的shuffle算子,就是用map的數據結構(邊聚合邊寫入內存),如果是join的算子,就使用array的數據結構(直接寫入內存)。接着,每寫一條數據進入內存數據結構之后,就會判斷是否達到了某個臨界值,如果達到了臨界值的話,就會嘗試的將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。
在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序,排序之后,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批次1萬條數據的形式分批寫入磁盤文件,寫入磁盤文件是通過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩沖輸出流,首先會將數據緩沖在內存中,當內存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤IO次數,提升性能。
此時task將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫,會產生多個臨時文件,最后會將之前所有的臨時文件都進行合並,最后會合並成為一個大文件。最終只剩下兩個文件,一個是合並之后的數據文件,一個是索引文件(標識了下游各個task的數據在文件中的start offset與end offset)。最終再由下游的task根據索引文件讀取相應的數據文件。
當非聚合的情況下,同時分區數少於設定的閾值,會啟動ByPass 機制,bypass的就是不排序,還是用hash去為key分磁盤文件,分完之后再合並,形成一個索引文件和一個合並后的key hash文件。省掉了排序的性能。
總結
- 功能上,MR的shuffle和Spark的shuffle是沒啥區別的,都是對Map端的數據進行分區,要么聚合排序,要么不聚合排序,然后Reduce端或者下一個調度階段進行拉取數據,完成map端到reduce端的數據傳輸功能。
- 方案上,有很大的區別,MR的shuffle是基於合並排序的思想,在數據進入reduce端之前,都會進行sort,為了方便后續的reduce端的全局排序,而Spark的shuffle是可選擇的聚合,特別是1.2之后,需要通過調用特定的算子才會觸發排序聚合的功能。
- 流程上,MR的Map端和Reduce區分非常明顯,兩塊涉及到操作也是各司其職,而Spark的RDD是內存級的數據轉換,不落盤,所以沒有明確的划分,只是區分不同的調度階段,不同的算子模型。
- 數據拉取,MR的reduce是直接拉去Map端的分區數據,而Spark是根據索引讀取,而且是在action觸發的時候才會拉去數據。
- HashShuffle,雖然MR和shuffle讀都會進行HashShuffle,但是如果在shuffle讀沒有combine操作的時候同時分區數少於設定的閾值,則不會在HashMap的時候預先對分區中所有的健值對進行merge和sort,從而省下了排序過程。