1、spark shuffle:spark 的 shuffle 主要發生在 DAG 視圖中的 stage 和 stage 之間,也就是RDD之間是寬依賴的時候,會發生 shuffle。
補充:spark shuffle在很多地方也會參照mapreduce一樣,將它分成兩個階段map階段、reduce階段。map階段就是數據還在各個節點上的階段,reduce階段就是相同的key被拉到了相同的節點上后的階段。
2、shuffle對於spark性能的影響:shuffle過程包括大量的磁盤IO、序列化、網絡數據傳輸等操作。因此,shuffle調優可以讓spark作業性能得到提高。但是值得注意的是,影響spark作業性能的主要因素還是:代碼開發、資源參數、數據傾斜,而shuffle調優只能在spark作業當中占到一小部分而已。
3、spark shuffle的四種策略:
shuffle manager 就是 負責管理 shuffle 過程的執行、計算、處理的組件,即shuffle管理器。
ShuffleManager隨着Spark的發展有兩種實現的方式,分別為HashShuffleManager(spark1.2之前使用)和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle兩種。
在Spark 1.2以前,默認的shuffle計算引擎是HashShuffleManager。
HashShuffleManager有着一個非常嚴重的弊端,就是會產生大量的中間磁盤文件,進而由大量的磁盤IO操作影響了性能。因此在Spark 1.2以后的版本中,默認的ShuffleManager改成了SortShuffleManager。
SortShuffleManager相較於HashShuffleManager來說,有了一定的改進。主要就在於,每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁盤文件,但是最后會將所有的臨時文件合並(merge)成一個磁盤文件,因此每個Task就只有一個磁盤文件。在下一個stage的shuffle read task拉取自己的數據時,只要根據索引讀取每個磁盤文件中的部分數據即可。
(1)未經優化的hashShuffle
上游的stage的task對相同的key執行hash算法,從而將相同的key都寫入到一個磁盤文件中,而每一個磁盤文件都只屬於下游stage的一個task。在將數據寫入磁盤之前,會先將數據寫入到內存緩沖,當內存緩沖填滿之后,才會溢寫到磁盤文件中。但是這種策略的不足在於,下游有幾個task,上游的每一個task都就都需要創建幾個臨時文件,每個文件中只存儲key取hash之后相同的數據,導致了當下游的task任務過多的時候,上游會堆積大量的小文件。
(2)經過優化的hashShuffle
在shuffle write過程中,上游stage的task就不是為下游stage的每個task創建一個磁盤文件了。此時會出現shuffleFileGroup的概念,每個shuffleFileGroup會對應一批磁盤文件,磁盤文件的數量與下游stage的task數量是相同的。一個Executor上有多少個CPU core,就可以並行執行多少個task。而第一批並行執行的每個task都會創建一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。當Executor的CPU core執行完一批task,接着執行下一批task時,下一批task就會復用之前已有的shuffleFileGroup,包括其中的磁盤文件。也就是說,此時task會將數據寫入已有的磁盤文件中,而不會寫入新的磁盤文件中。因此,consolidate機制允許不同的task復用同一批磁盤文件,這樣就可以有效將多個task的磁盤文件進行一定程度上的合並,從而大幅度減少磁盤文件的數量,進而提升shuffle write的性能。
注意:如果想使用優化之后的ShuffleManager,需要將:spark.shuffle.consolidateFiles調整為true。(當然,默認是開啟的)
未經優化: 上游的task數量:m , 下游的task數量:n , 上游的executor數量:k (m>=k) , 總共的磁盤文件:m*n
優化之后的: 上游的task數量:m , 下游的task數量:n , 上游的executor數量:k (m>=k) , 總共的磁盤文件: k*n
(3)SortShuffle普通機制:
shuffle write:mapper階段,上一個stage得到最后的結果寫出
shuffle read :reduce階段,下一個stage拉取上一個stage進行合並
在普通模式下,數據會先寫入一個內存數據結構中,此時根據不同的shuffle算子,可以選用不同的數據結構。如果是由聚合操作的shuffle算子,就是用map的數據結構(邊聚合邊寫入內存),如果是join的算子,就使用array的數據結構(直接寫入內存)。接着,每寫一條數據進入內存數據結構之后,就會判斷是否達到了某個臨界值,如果達到了臨界值的話,就會嘗試的將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。
在溢寫到磁盤文件之前,會先根據key對內存數據結構中已有的數據進行排序,排序之后,會分批將數據寫入磁盤文件。默認的batch數量是10000條,也就是說,排序好的數據,會以每批次1萬條數據的形式分批寫入磁盤文件,寫入磁盤文件是通過Java的BufferedOutputStream實現的。BufferedOutputStream是Java的緩沖輸出流,首先會將數據緩沖在內存中,當內存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤IO次數,提升性能。
此時task將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫,會產生多個臨時文件,最后會將之前所有的臨時文件都進行合並,最后會合並成為一個大文件。最終只剩下兩個文件,一個是合並之后的數據文件,一個是索引文件(標識了下游各個task的數據在文件中的start offset與end offset)。最終再由下游的task根據索引文件讀取相應的數據文件。
自己的理解:sortshuffle機制就是先將數據寫入到內存緩存,緩存達到閾值進行磁盤溢寫,磁盤溢寫是分批次進行的,並且在溢寫之前對key進行排序。這樣就形成了很多的批次排序磁盤文件,然后將這些批次key排序磁盤文件進行合並,就形成了一個task的總的key的排序文件,在為這個key排序文件創建個索引文件,這樣下一個stage的task就可以根據索引去文件中拉取自己的數據了。
(4)SortShuffle 的 ByPass 機制:
此時上游stage的task會為每個下游stage的task都創建一個臨時磁盤文件,並將數據按key進行hash然后根據key的hash值,將key寫入對應的磁盤文件之中。當然,寫入磁盤文件時也是先寫入內存緩沖,緩沖寫滿之后再溢寫到磁盤文件的。最后,同樣會將所有臨時磁盤文件都合並成一個磁盤文件,並創建一個單獨的索引文件。
自己的理解:bypass的就是不排序,還是用hash去為key分磁盤文件,分完之后再合並,形成一個索引文件和一個合並后的key hash文件。省掉了排序的性能。
bypass機制與普通SortShuffleManager運行機制的不同在於:
a、磁盤寫機制不同;
b、不會進行排序。
也就是說,啟用該機制的最大好處在於,shuffle write過程中,不需要進行數據的排序操作,也就節省掉了這部分的性能開銷。
觸發bypass機制的條件:
shuffle map task的數量小於 spark.shuffle.sort.bypassMergeThreshold 參數的值(默認200)或者不是聚合類的shuffle算子(比如groupByKey)
參考博客:https://blog.csdn.net/qichangjian/article/details/88039576