有許多場景下,我們需要進行跨服務器的數據整合,比如兩個表之間,通過Id進行join操作,你必須確保所有具有相同id的數據整合到相同的塊文件中。那么我們先說一下mapreduce的shuffle過程。
Mapreduce的shuffle的計算過程是在executor中划分mapper與reducer。Spark的Shuffling中有兩個重要的壓縮參數。spark.shuffle.compress true---是否將會將shuffle中outputs的過程進行壓縮。將spark.io.compression.codec編碼器設置為壓縮數據,默認是true.同時,通過spark.shuffle.manager 來設置shuffle時的排序算法,有hash,sort,tungsten-sort。(用hash會快一點,我不需要排序啊~)
Hash Shuffle
使用hash散列有很多缺點,主要是因為每個Map task都會為每個reduce生成一份文件,所以最后就會有M * R個文件數量。那么如果在比較多的Map和Reduce的情況下就會出問題,輸出緩沖區的大小,系統中打開文件的數量,創建和刪除所有這些文件的速度都會受到影響。如下圖:
這里有一個優化的參數spark.shuffle.consolidateFiles,默認為false,當設置成true時,會對mapper output時的文件進行合並。如果你集群有E個executors(“-num-excutors”)以及C個cores("-executor-cores”),以及每個task又T個CPUs(“spark.task.cpus”),那么總共的execution的slot在集群上的個數就是E * C / T(也就是executor個數×CORE的數量/CPU個數)個,那么shuffle過程中所創建的文件就為E * C / T * R(也就是executor個數 × core的個數/CPU個數×reduce個數)個。外文文獻寫的太公式化,那么我用通俗易懂的形式闡述下。就好比總共的並行度是20(5個executor,4個task) Map階段會將數據寫入磁盤,當它完成時,他將會以reduce的個數來生成文件數。那么每個executor就只會計算core的數量/cpu個數的tasks.如果task數量大於總共集群並行度,那么將開啟下一輪,輪詢執行。
速度較快,因為沒有再對中間結果進行排序,減少了reduce打開文件時的性能消耗。
當然,當數據是經過序列化以及壓縮的。當重新讀取文件,數據將進行解壓縮與反序列化,這里reduce端數據的拉取有個參數spark.reducer.maxSizeInFlight(默認為48MB),它將決定每次數據從遠程的executors中拉取大小。這個拉取過程是由5個並行的request,從不同的executor中拉取過來,從而提升了fetch的效率。 如果你加大了這個參數,那么reducers將會請求更多的文數據進來,它將提高性能,但是也會增加reduce時的內存開銷。
Sort Shuffle
Sort Shuffle如同hash shuffle map寫入磁盤,reduce拉取數據的一個性質,當在進行sort shuffle時,總共的reducers要小於spark.shuffle.sort.bypassMergeThrshold(默認為200),將會執行回退計划,使用hash將數據寫入單獨的文件中,然后將這些小文件聚集到一個文件中,從而加快了效率。(實現自BypassMergeSortShuffleWriter中)
那么它的實現邏輯是在reducer端合並mappers的輸出結果。Spark在reduce端的排序是用了TimSort,它就是在reduce前,提前用算法進行了排序。 那么用算法的思想來說,合並的M N個元素進行排序,那么其復雜度為O(MNlogM) 具體算法不講了~要慢慢看~
隨之,當你沒有足夠的內存保存map的輸出結果時,在溢出前,會將它們disk到磁盤,那么緩存到內存的大小便是 spark.shuffle.memoryFraction * spark.shuffle.safyFraction.默認的情況下是”JVM Heap Size * 0.2 * 0.8 = JVM Heap Size * 0.16”。需要注意的是,當你多個線程同時在一個executor中運行時(spark.executor.cores/spark.task.cpus 大於1的情況下),那么map output的每個task將會擁有 “JVM Heap Size * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction / spark.executor.cores * spark.task.cpus。運行原理如下圖:
使用此種模式,會比使用hashing要慢一點,可通過bypassMergeThreshold找到集群的最快平衡點。
Tungsten Sort
使用此種排序方法的優點在於,操作的二進制數據不需要進行反序列化。它使用 sun.misc.Unsafe模式進行直接數據的復制,因為沒有反序列化,所以直接是個字節數組。同時,它使用特殊的高效緩存器ShuffleExtemalSorter壓記錄與指針以及排序的分區id.只用了8 Bytes的空間的排序數組。這將會比使用CPU緩存要效率。
每個spill的數據、指針進行排序,輸出到一個索引文件中。隨后將這些partitions再次合並到一個輸出文件中。
本文翻譯自一位國外大神的博客:https://0x0fff.com/spark-memory-management/