概述
Shuffle是Spark Core比較復雜的模塊,它也是非常影響性能的操作之一。因此,在這里整理了會影響Shuffle性能的各項配置。
1)spark.shuffle.manager
Spark 1.2.0官方版本支持兩種方式的Shuffle,即Hash Based Shuffle和Sort Based Shuffle。其中在Spark 1.0之前僅支持Hash Based Shuffle。Spark 1.1引入了Sort Based Shuffle。Spark 1.2的默認Shuffle機制從Hash變成了Sort。如果需要Hash Based Shuffle,只需將spark.shuffle.manager設置成"hash"即可。
配置方式:
①進入spark安裝目錄的conf目錄
②cp spark-defaults.conf.template spark-defaults.conf
③spark.shuffle.manager=hash
應用場景:當產生的臨時文件不是很多時,性能可能會比sort shuffle要好。因為沒有任何的排序操作
如果對性能有比較苛刻的要求,那么就要理解這兩種不同的Shuffle機制的原理,結合具體的應用場景進行選擇。
對於不需要進行排序且Shuffle產生的文件數量不是特別多時,Hash Based Shuffle可能是更好的選擇;因為Sort Based Shuffle會按照Reducer的Partition進行排序。
而Sort Based Shuffle的優勢就在於可擴展性,它的出現實際上很大程度上是解決Hash Based Shuffle的可擴展性的問題。由於Sort Based Shuffle還在不斷地演進中,因此它的性能會得到不斷改善。
對於選擇哪種Shuffle,如果性能要求苛刻,最好還是通過實際測試后再做決定。不過選擇默認的Sort,可以滿足大部分的場景需要。
2)spark.shuffle.spill
這個參數的默認值是true,用於指定Shuffle過程中如果內存中的數據超過閾值(參考spark.shuffle.memoryFraction的設置)時是否需要將部分數據臨時寫入外部存儲。如果設置為false,那么這個過程就會一直使用內存,會有內存溢出的風險。因此只有在確定內存足夠使用時,才可以將這個選項設置為false。
3)spark.shuffle.memoryFraction
在啟用spark.shuffle.spill的情況下,spark.shuffle.memoryFraction決定了當Shuffle過程中使用的內存達到總內存多少比例的時候開始spill。在Spark 1.2.0里,這個值是0.2。
此參數可以適當調大,可以控制在0.4~0.6。
通過這個參數可以設置Shuffle過程占用內存的大小,它直接影響了寫入到外部存儲的頻率和垃圾回收的頻率。可以適當調大此值,可以減少磁盤I/O次數。
4)spark.shuffle.blockTransferService
在Spark 1.2.0中這個配置的默認值是netty,而在之前的版本中是nio。它主要是用於在各個Executor之間傳輸Shuffle數據。netty的實現更加簡潔,但實際上用戶不用太關心這個選項。除非有特殊需求,否則采用默認配置即可。
5)spark.shuffle.consolidateFiles
這個配置的默認值是false。主要是為了解決在Hash Based Shuffle過程中產生過多文件的問題。如果配置選項為true,那么對於同一個Core上運行的Shuffle Map Task不會產生一個新的Shuffle文件而是重用原來的。
6)spark.shuffle.compress和spark.shuffle.spill.compress
這兩個參數的默認配置都是true。spark.shuffle.compress和spark.shuffle.spill.compress都是用來設置Shuffle過程中是否對Shuffle數據進行壓縮。其中,前者針對最終寫入本地文件系統的輸出文件;后者針對在處理過程需要寫入到外部存儲的中間數據,即針對最終的shuffle輸出文件。
1. 設置spark.shuffle.compress
需要評估壓縮解壓時間帶來的時間消耗和因為數據壓縮帶來的時間節省。如果網絡成為瓶頸,比如集群普遍使用的是千兆網絡,那么將這個選項設置為true可能更合理;如果計算是CPU密集型的,那么將這個選項設置為false可能更好。
2. 設置spark.shuffle.spill.compress
如果設置為true,代表處理的中間結果在spill到本地硬盤時都會進行壓縮,在將中間結果取回進行merge的時候,要進行解壓。因此要綜合考慮CPU由於引入壓縮、解壓的消耗時間和Disk IO因為壓縮帶來的節省時間的比較。在Disk IO成為瓶頸的場景下,設置為true可能比較合適;如果本地硬盤是SSD,那么設置為false可能比較合適。
7)spark.reducer.maxMbInFlight
這個參數用於限制一個Result Task向其他的Executor請求Shuffle數據時所占用的最大內存數,默認是64MB。尤其是如果網卡是千兆和千兆以下的網卡時。默認值是設置這個值需要綜合考慮網卡帶寬和內存。