Shuffle是性能調優的重點,Shuffle是為了匯聚有共同特征的一類數據到一個計算節點上進行計算。
Shuffle過程非常復雜:數據量大、將數據匯聚到正確的Partition和節點時產生多次磁盤交互、節省帶寬而無可避免的壓縮、網絡傳輸所需的序列化
Shuffle需要持久化計算的中間結果,因為一旦數據丟失就要重新計算所有依賴的RDD
所以主要分析如何持久化(Shuffle Write),使下游的Task順利獲得(Shuffle Read),Shuffle Map Task是如何計算結果的
Hash Based Shuffle Write
Spark1.0只支持這種shuffle,因為不需要運用到排序。
每個Shuffle Map Task根據key的哈希值,計算每個key需要寫入的partition,將數據單獨寫入一個文件,這個partition對應了下游的shuffle map task或者result task,下游的task讀取這個文件並進行計算。
Basic Shuffle Writer
在Executor上執行shuffle Map Task時,最終調用了scheduler.ShuffleMapTask#runTask,其核心邏輯為:
1 從SparkEnv中獲得shuffleManager,不僅支持Hash和Sort,還支持自定義的shuffle
2 從manager中取得writer,hash情況下獲得的是shuffle.hash.HashShuffleWriter
3 調用rdd運算,運算結果通過writer進行持久化,也就是HashShuffleWriter#write,通過ShuffleDependency定義是否有Aggregator來確定是否作Map端聚合,將原始結果或聚合結果通過shuffle.FileShuffleBlockManager#forMapTask方法寫入,寫完后將元數據信息寫入scheduler.MapStatus,下游的Task根據MapStatus取得需要處理的數據。

書本還對步驟3進行了詳細的解釋
存在的問題:
每個Shuffle Map Task需要為每個下游的Task創建獨立的文件,這樣會導致文件數量很多
1 每個節點同時打開多個文件,需要巨大的內存
2 隨機寫入硬盤的性能比順序寫入差
Shuffle Consolidate Writer
解決Shuffle過程中產生文件過多的問題。在Spark0.8.1加入Shuffle Consolidate Files機制。將shuffle.consolidateFiles設置為true開啟。
也就是運行在同一個core里的shuffle map task,會共用一個文件,由第一個task創建,之后的task都追加在這個文件上,當shuffle map task的數量明顯多於core時,此種writer就比basic的文件數少很多

源碼上,與basic不同的地方就是forMaskTask#writers的實現不同。在同一個core上運行的shuffle map task相當於寫了這個文件的不同部分。使用這種writer時下游的task通過shuffle.FileShuffleBlockManager.ShuffleFileGroup#getFileSegmentFor來區分。
雖然Shuffle Consolidate機制緩解了文件過多的問題,但沒辦法解決同時打開多個文件造成內存使用過大,以及隨機讀影響性能的問題。
為了解決hash based shuffle的缺陷:
spark1.0開始建立了shuffle pluggable,可自定義external shuffle service
spark1.1實現了sort based shuffle,1.2版本sort based shuffle成為shuffle默認選項
Shuffle Pluggable框架:
需要實現新的shuffle機制,就要實現以下接口:

org.apache.spark.shuffle.ShuffleManager
Driver和Executor都有一個ShuffleManager,通過配置項spark.shuffle.manager制定,由SparkEnv創建。
Driver的ShuffleManager負責注冊Shuffle元數據(shuffleId、Map Task數量),Executor的ShuffleManager負責讀寫Shuffle數據
需要實現的功能有:
1 Driver注冊元數據信息
2 獲得Shuffle Writer,根據Shuffle Map Task的ID創建Shuffle Writer
3 獲得Shuffle Reader,根據shuffleId和Partition的ID創建Shuffle Reader
4 為數據成員shuffleBlockManager賦值,保存實際的ShuffleBlockManager
5 刪除本地的Shuffle元數據
6 停止Shuffle Manager
源碼可看shuffle.sort.SortShuffleManager和hash.HashShuffleManger兩種shuffle方案的做法
org.apache.spark.shuffle.ShuffleBlockManager
主要是從本地讀取Shuffle數據的功能,接口通過spark.storage.BlockManager調用
主要功能有:
1 def getBytes,輸入ShuffleBlockId,輸出ByteBuffer
2 def getBlockData,輸入ShuffleBlocked,輸出ManagedBuffer,就是用來讀取的方法
3 def stop,停止Manager
源碼看shuffle.IndexShuffleBlockManager和shuffle.FileShuffleBlockManager
org.apache.spark.shuffle.ShuffleReader
實現了下游Task如何讀取上游ShuffleMapTask的輸出,通過spark.MapOutputTracker獲得數據位信息,在本地則調用spark.storage.BlockManager的getBlockManager(其實也調用了shuffle.ShuffleBlockManager的getBlockManager)
只要實現了上面的模塊就能實現shuffle機制,實現時要考慮到超大規模數據場景下的性能問題和資源消耗問題
Sort Based Write
1.2中默認為Sort Based Write,shuffle manager從hash換成了sort
Hash based shuffle設計目的是避免排序,但處理超大規模數據集時產生了大量的IO和內存消耗,而sort based shuffle將所有結果寫到一個文件里,同時生成一個index文件,reducer通過index文件取得需要處理的數據,這樣就節省了內存的使用和disk帶來的高延時,減少了GC的風險和頻率,避免同時讀寫多個文件的壓力。
Shuffle Map Task按照key相應的partition ID進行sort,同一個partition的key不進行sort

這個過程中內存若不夠用,就把排序好的內容寫入外部存儲,結束時進行一次歸並排序
Index文件記錄不同partition的位置信息,spark.storage.BlockManager實現了這種尋址方式,task能夠獲取到所需的partition
SortShuffleWriter的實現邏輯:
1 對於每個Partition創建一個scala.Array存儲key/value對
2 scala.Array超過閾值時將內存數據寫入外部存儲文件,文件開始部分記錄partition ID和數據條數
3 將所有外部存儲的文件進行歸並排序,一般每次同時打開10~100個文件,太多會產生內存溢出或垃圾回收,太少會提高延時
4 生成數據文件時同時生成Index索引文件,記錄不同partition的起始位置
至於SortShuffleManager書本上沒有詳細講,可在gitHub的apache.spark.sparkCore中看源碼。
Shuffle Map Task運算結果處理:
分為兩部分:Executor端直接處理Task結果、Driver端接到Task運行結束信息時對Shuffle Write的結果處理,使下游Task能得到需要的數據
與第6章executor的工作類似,就不重復看了
Shuffle Read
在Spark1.2中,不管Hash Based Shuffle還是Sort Based Shuffle,內置的Shuffle Reader都是spark.shuffle.hash.HashShuffleReader
整體流程是從ShuffledRDD#compute開始,調用ShuffleManager的getReader方法獲取到ShuffleReader,調用read()方法進行讀取。
storage.ShuffleBlockFetcherIterator通過splitLocalRemoteBlocks划分數據讀取策略:
如果數據在本地,可以直接從BlockManager獲取,如果需要從其他節點獲取,就通過網絡,由於Shuffle數據量很大,分為以下幾種網絡讀取策略:
1 每次最多啟動5個線程到最多5個節點上讀取數據
2 每次請求的數據大小不超過spark.reducer.maxMbInFlight(默認為48MB)的五分之一
避免目標機器占用過多帶寬,而且請求數據可以平行化
本地讀取:
FetchLocalBlocks()負責本地Block獲取,其實就是調用ShuffleBlockManager中的getBlockData方法
遠程讀取:
兩種方式,netty和nio,通過spark.shuffle.blockTransferService來設置
其中storage.ShuffleBlockFetcherIterator#sendRequest向遠程節點發起讀取Block請求
書中還有很多具體實現的源碼
性能調優:
spark.shuffle.manager
選擇hash還是sort,取決於內存、排序、文件操作綜合因素:
對於不需要排序且shuffle產生文件數量不多的情況下,hash更優,因為sort會按照reducer的partition進行排序浪費時間
sort的優勢在於可擴展性,還在不斷演進中
spark.shuffle.spill
參數默認為true,指定Shuffle過程中如果內存中的數據超過閾值是否將部分數據臨時寫入外部存儲。設置為false會一直使用內存,有內存溢出風險
為了防止內存溢出,hash包裝了spark.util.collection.ExternalAppendOnlyMap和spark.util.collection.ExternalSorter
Shuffle Read在聚合時也可能將數據寫入外部以防止內存溢出
spark.shuffle.memoryFraction和spark.shuffle.safetyFraction
spark.shuffle.memoryFraction決定shuffle使用的內存到達總內存多少比例時開始寫入外部存儲,默認為0.2,它影響了寫入外部存儲的頻率和垃圾回收的頻率
為了降低寫入外部存儲的頻率,適當提高spark.shuffle.memoryFraction,而且為了避免內存溢出,增加它時需要減少RDD cache的內存,即減少spark.storage.memoryFraction
為了處理實際使用的內存比估算值要大的情況,spark.shuffle.safetyFraction可降低實際shuffle過程中用到的內存,防止超出配置值
spark.shuffle.sort.bypassMergeThreshold
設置reducer的partition少於一定數量時(默認為200),sort based shuffle內部不使用歸並排序,而是直接將每個partition寫入單獨文件,類似於hash方式,區別在於最后還是會合成一個文件,並通過index文件標記位置
存在同時打開文件過多導致內存占用過大的風險,如果內存比較緊張,可降低這個值
spark.shuffle.blockTransferService
spark1.2默認為netty,之前版本是nio,netty實現更加簡潔
spark.shuffle.consolidateFiles
默認為false,處理hash方式產生過多文件的問題,若選為true,就是同一個core上運行的shuffle map task共用同一個文件。這個策略沒什么用,並不能減少內存消耗
Spark.shuffle.compress和spark.shuffle.spill.cpmpress
默認為true,都是設置shuffle過程中是否對數據進行壓縮的參數。前者針對最終寫入本地文件系統的輸出文件,而后者針對處理過程中需要寫入外部存儲的中間數據
前者主要考慮到網絡IO是否成為瓶頸,若網絡比較慢,則需要進行壓縮,如果計算是CPU密集型的,就設置為false
后者應對CPU的壓縮解壓時間和Disk IO的時間進行比較,若Disk IO較慢時應設置為true,若硬盤時SSD,false可能更快
spark.reducer.maxMbInFlight
限制Reducer Task向其他Executor請求Shuffle數據所占最大內存,默認為48MB,主要考慮網絡帶寬問題
總結:
Spark 0.6和0.7,shuffle結果都先放在內存中,導致了OC和OOM概率非常大。在0.8增加了shuffle結果寫入磁盤,並且為下游task生成單獨的文件,解決了結果都需要存入內存的問題,但引入了文件過多的問題。0.8.1引入FileConsolidation機制減少了文件數。1.0引入Shuffle Pluggabkle框架,允許自定義shuffle機制。1.1引入sort based shuffle。1.2版本sort變成了默認。