Shuffle 概述
影響Spark性能的大BOSS就是shuffle,因為該環節包含了大量的磁盤IO、序列化、網絡數據傳輸等操作。
因此,如果要讓作業的性能更上一層樓,就有必要對 shuffle 過程進行調優。
當然,影響 Spark 性能的還有代碼開發、參數設置數以及數據傾斜的解決等,甚至這部分才是大頭,shuffle 調優只能在整個 Spark 的性能調優中占到一小部分而已。
所以寫好一個優秀高效的代碼才是關鍵。
shuffle 調優 只是錦上添花而已。
未經優化的HashShuffleManager
這是spark1.2版本之前,最早使用的shuffle方法,這種shuffle方法不要使用,只是用來對比改進后的shuffle方法。
上游每個task 都輸出下游task個數的結果文件,下游每個task去上游task輸出的結果文件中獲取對應自己的文件。
缺點:生成文件個數過多,生成和傳輸文件數量等於 上游task數量 * 下游task數量 個文件。
對應spark參數如下:
參數 | 值 |
---|---|
spark.shuffle.manager | hash |
spark.shuffle.consolidateFiles | false |
經過優化以后的HashShufferManager
上游1個Executor所有task順序輸出下游task個數的結果文件,下游每個task去上游task輸出的結果文件中獲取對應自己的。
這個過程中有一個shuffleFileGroup 的概念,每個 shuffleFileGroup 會對應一批磁盤文件,磁盤文件的數量與下游 stage 的 task 數量是相同的。
一個 Executor 上有多少個 CPU core,就可以並行執行多少個task。而第一批並行執行的每個 task 都會創建一個shuffleFileGroup,並將數據寫入對應的磁盤文件內。
結合下圖可知,優化后的HashShufferManager減少了中間文件輸出,生成和傳輸 上游executor_num * 下游task數量 個文件
對應spark參數如下:
參數 | 值 |
---|---|
spark.shuffle.manager | hash |
spark.shuffle.consolidateFiles | true |
SortShuffleManager 運行原理
SortShuffleManager 有兩種的運行機制:
普通運行機制
bypass運行機制
當 shuffle read task 的數量大於 spark.shuffle.sort.bypassMergeThreshold
參數的值時(默認 200),啟用 bypass 機制。
普通運行機制
觸發條件:shuffle read task 的數量大於 spark.shuffle.sort.bypassMergeThreshold
(默認200)
原理及流程:
- 數據會先寫入一個內存數據結構中,此時根據不同的 shuffle 算子,可能選用不同的數據結構(Map or Array )。
- 如果是 reduceByKey 這種聚合類的 shuffle 算子,那么會選用 Map 數據結構,一邊通過 Map進行聚合,一邊寫入內存(這也是為什么某些情況下聚合類算子選用reduceByKey 替換groupbykey);
- 如果是普通的 shuffle算子如join,count等,那么會選用Array數據結構,直接寫入內存。
- 然后,每寫一條數據進入內存數據結構之后如果達到了某個臨界閾值,那么就會嘗試將內存數據結構中的數據溢寫到磁盤,然后清空內存數據結構。
- 在溢寫到磁盤文件之前,會先根據 key對內存數據結構中已有的數據進行排序。
- 排序過后,會分批將數據寫入磁盤文件。默認的 batch數量是10000條,也就是說,排序好的數據,會以每批 1萬條數據的形式通過 Java 的 BufferedOutputStream寫入磁盤文件。
- 一個 task 將所有數據寫入內存數據結構的過程中,會發生多次磁盤溢寫操作,產生多個臨時文件。最后會將之前所有的臨時磁盤文件都進行合並,這就是 merge 過程,此時會將之前所有臨時磁盤文件中的數據讀取出來,生成 1個文件+索引文件(標識了下游各個 task 的數據在文件中的 start offset 與 end offset)。
注:BufferedOutputStream 是 Java的緩沖輸出流,首先會將數據緩沖在內存中,當內存緩沖滿溢之后再一次寫入磁盤文件中,這樣可以減少磁盤 IO 次數,提升性能。
最終產生的文件數量等於上游task的數量,如上游task 有100個,下有task200個,也只會產生100個文件
對應spark參數如下:
參數 | 值 |
---|---|
spark.shuffle.manager | sort |
spark.shuffle.sort.bypassMergeThreshold | 默認200 |
bypass 運行機制
觸發條件如下:
- 沒有map端聚合(比如 reduceByKey)。
- 最多有
spark.shuffle.sort.bypassMergeThreshold
reduce分區。
原理及流程:
- 上游task 會為每個下游 task 都創建一個內存緩沖,並根據 key 的 hash 值寫入對應的緩沖區。
- 緩沖區滿之后溢寫到磁盤文件的。
- 最后,將所有臨時磁盤文件都合並成一個磁盤文件,並創建一個單獨的索引文件。
對比未經優化的 HashShuffleManager:
其實前面的步驟和未經優化的 HashShuffleManager是一摸一樣額,只是最后多了一了merge的操作,產生的文件包括一個盤文件和一個索引文件。
最終磁盤文件的數量等於上游task的數量
shuffle 相關參數調優
參數來源:spark 官方文檔
此處只選擇部分參數解析
spark.shuffle.file.buffer
:32k
釋義:在shuffle write task 通過 BufferedOutputStream將數據寫到磁盤文件之前,會先寫入 buffer 緩沖中,待緩沖寫滿之后,才會溢寫到磁盤。次參數控制改緩沖區的大小。
建議:資源允許的情況下,可以設置的大一點,可以減少減少磁盤 IO 次數,提升性能。
spark.reducer.maxSizeInFlight
:48m
釋義:從每個reduce任務同時獲取的map輸出的最大大小,由於每個輸出都需要創建一個緩沖區來接收它,因此每個reduce任務的內存開銷都是固定的,所以要保持較小的內存,除非您有大量的內存。。
建議:資源允許的情況下,可以設置的大一點,可以減少網絡傳輸的次數,提升性能。
spark.shuffle.io.maxRetries
:3
釋義:(Netty only)shuffle read task 從 shuffle write task 所在節點拉取屬於自己的數據時,如果因為網絡異常導致拉取失敗,是會自動進行重試的,如果設置為非零值n,則會自動重試n次由於io相關異常而失敗的獲取。
這種重試邏輯有助於在面對長時間GC暫停或瞬態網絡連接問題時穩定大型改組。
建議:對於超大型的任務,建議調大該參數(比如30+)
spark.shuffle.io.retryWait
:5s
釋義:(Netty only)兩次讀取重試之間需要等待多長時間。默認情況下,重試導致的最大延遲為15秒,計算為maxRetries * retryWait。
建議:加大間隔時長(比如 60s),以增加 shuffle 操作的穩定性,對應的可能回導致任務執行時間加長。
spark.memory.useLegacyMode
:false
釋義:是否啟用Spark 1.5及以前使用的遺留內存管理模式。遺留模式嚴格地將堆空間划分為固定大小的區域,如果應用程序沒有進行調優,可能會導致過度溢出。
只有設置為true,會讀取下列三個已廢棄的內存部分配置:
spark.shuffle.memoryFraction
spark.storage.memoryFraction
spark.storage.unrollFraction
spark.shuffle.memoryFraction
:0.2
釋義:(不推薦)在spark.memory.useLegacyMode=true
時才會啟用,shuffle期間Executor 內存中用於聚合和組合的Java堆所占的比例,在任何給定時間,用於shuffle的所有內存映射的集合大小都受此限制,超過此限制,內容將開始溢出到磁盤。
建議:內存充足情況下,而且很少使用持久化操作,且溢出到磁盤頻繁,建議調高這個比例,給 shuffle read 的聚合操作更多內存,以避免由於內存不足導致聚合過程中頻繁讀寫磁盤。
spark.shuffle.manager
:sort
釋義:該參數用於設置 ShuffleManager 的類型。Spark 1.5以后,有三個可選項:hash、sort 和tungsten-sort(這個就是所謂的鎢絲計划,貌似出了很多bug,玩脫了,至少我沒怎么用)。HashShuffleManager 是 Spark 1.2 以前的默認選項,但是 Spark 1.2以及之后的版本默認都是 SortShuffleManager 了。
建議:維持原樣,通過后面的兩個參數控制是否排序。
spark.shuffle.sort.bypassMergeThreshold
:200
釋義:(高級)在基於排序的shuffle manager 中,如果沒有map端聚合,並且最多有這么多reduce分區,則避免合並排序數據。
建議:如果shuffle 的確不需要排序操作,可以將這個參數調大一些,大於 shuffle read task 的數量。會自動啟用 bypass 機制,map-side就不會進行排序了,減少了排序的性能開銷。但是這種方式下,依然會產生大量的磁盤文件(貌似不排序也沒少多少...但是時間會比排序少很多就是了)。
spark.shuffle.consolidateFiles
:false
釋義:如果設置為 true,那么就會開啟 consolidate 機制,會大幅度合並 shuffle write 的輸出文件,對於 shuffle read task數量特別多的情況下,這種方法可以極大地減少磁盤 IO 開銷,提升性能。
建議:如果確實不需要排序排序機制,那么設置spark.shffle.manager=hash
,測試發現性能比開啟了 bypass 機制的 SortShuffleManager要高(猜測可能是多了merge,和索引部分的操作,不過沒有驗證過),