徹底搞懂spark的shuffle過程(shuffle write)


什么時候需要 shuffle writer

假如我們有個 spark job 依賴關系如下

我們抽象出來其中的rdd和依賴關系:


E <-------n------,                    C <--n---D---n-----F--s---,                            A <-------s------ B <--n----`-- G

對應的划分后的RDD結構為:

最終我們得到了整個執行過程:

 

中間就涉及到shuffle 過程,前一個stage 的 ShuffleMapTask 進行 shuffle write, 把數據存儲在 blockManager 上面, 並且把數據位置元信息上報到 driver 的 mapOutTrack 組件中, 下一個 stage 根據數據位置元信息, 進行 shuffle read, 拉取上個stage 的輸出數據。

這篇文章講述的就是其中的 shuffle write 過程。

spark shuffle 演進的歷史

  • Spark 0.8及以前 Hash Based Shuffle

  • Spark 0.8.1 為Hash Based Shuffle引入File Consolidation機制

  • Spark 0.9 引入ExternalAppendOnlyMap

  • Spark 1.1 引入Sort Based Shuffle,但默認仍為Hash Based Shuffle

  • Spark 1.2 默認的Shuffle方式改為Sort Based Shuffle

  • Spark 1.4 引入Tungsten-Sort Based Shuffle

  • Spark 1.6 Tungsten-sort並入Sort Based Shuffle

  • Spark 2.0 Hash Based Shuffle退出歷史舞台

總結一下, 就是最開始的時候使用的是 Hash Based Shuffle, 這時候每一個Mapper會根據Reducer的數量創建出相應的bucket,bucket的數量是M R ,其中M是Map的個數,R是Reduce的個數。這樣會產生大量的小文件,對文件系統壓力很大,而且也不利於IO吞吐量。后面忍不了了就做了優化,把在同一core上運行的多個Mapper 輸出的合並到同一個文件,這樣文件數目就變成了 cores R 個了,

舉個例子:

本來是這樣的,3個 map task, 3個 reducer, 會產生 9個小文件,

是不是很恐怖, 后面改造之后

4個map task, 4個reducer, 如果不使用 Consolidation機制, 會產生 16個小文件。

但是但是現在這 4個 map task 分兩批運行在 2個core上, 這樣只會產生 8個小文件

在同一個 core 上先后運行的兩個 map task的輸出, 對應同一個文件的不同的 segment上, 稱為一個 FileSegment, 形成一個 ShuffleBlockFile,

后面就引入了  Sort Based Shuffle, map端的任務會按照Partition id以及key對記錄進行排序。同時將全部結果寫到一個數據文件中,同時生成一個索引文件, 再后面就就引入了 Tungsten-Sort Based Shuffle, 這個是直接使用堆外內存和新的內存管理模型,節省了內存空間和大量的gc, 是為了提升性能。

現在2.1 分為三種writer, 分為 BypassMergeSortShuffleWriter, SortShuffleWriter 和 UnsafeShuffleWriter,顧名思義,大家應該可以對應上,我們本着過時不講的原則, 本文中只描述這三種 writer 的實現細節, Hash Based Shuffle 已經退出歷史舞台了,我就不講了。

三種 writer 的分類

 

上面是使用哪種 writer 的判斷依據, 是否開啟 mapSideCombine 這個判斷,是因為有些算子會在 map 端先進行一次 combine, 減少傳輸數據。 因為 BypassMergeSortShuffleWriter 會臨時輸出Reducer個(分區數目)小文件,所以分區數必須要小於一個閥值,默認是小於200。

UnsafeShuffleWriter需要Serializer支持relocation,Serializer支持relocation:原始數據首先被序列化處理,並且再也不需要反序列,在其對應的元數據被排序后,需要Serializer支持relocation,在指定位置讀取對應數據。

BypassMergeSortShuffleWriter 實現細節

BypassMergeSortShuffleWriter和Hash Shuffle中的HashShuffleWriter實現基本一致, 唯一的區別在於,map端的多個輸出文件會被匯總為一個文件。 所有分區的數據會合並為同一個文件,會生成一個索引文件,是為了索引到每個分區的起始地址,可以隨機 access 某個partition的所有數據。

但是需要注意的是,這種方式不宜有太多分區,因為過程中會並發打開所有分區對應的臨時文件,會對文件系統造成很大的壓力。

具體實現就是給每個分區分配一個臨時文件,對每個 record的key 使用分區器(模式是hash,如果用戶自定義就使用自定義的分區器)找到對應分區的輸出文件句柄,直接寫入文件,沒有在內存中使用 buffer。 最后copyStream方法把所有的臨時分區文件拷貝到最終的輸出文件中,並且記錄每個分區的文件起始寫入位置,把這些位置數據寫入索引文件中。

SortShuffleWriter 實現細節

我們可以先考慮一個問題,假如我有 100億條數據,但是我們的內存只有1M,但是我們磁盤很大, 我們現在要對這100億條數據進行排序,是沒法把所有的數據一次性的load進行內存進行排序的,這就涉及到一個外部排序的問題,我們的1M內存只能裝進1億條數據,每次都只能對這 1億條數據進行排序,排好序后輸出到磁盤,總共輸出100個文件,最后怎么把這100個文件進行merge成一個全局有序的大文件。我們可以每個文件(有序的)都取一部分頭部數據最為一個 buffer, 並且把這 100個 buffer放在一個堆里面,進行堆排序,比較方式就是對所有堆元素(buffer)的head元素進行比較大小, 然后不斷的把每個堆頂的 buffer 的head 元素 pop 出來輸出到最終文件中, 然后繼續堆排序,繼續輸出。如果哪個buffer 空了,就去對應的文件中繼續補充一部分數據。最終就得到一個全局有序的大文件。

如果你能想通我上面舉的例子,就差不多搞清楚sortshufflewirter的實現原理了,因為解決的是同一個問題。

SortShuffleWriter 中的處理步驟就是

  • 使用 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 在內存中進行排序,  排序的 K 是(partitionId, hash(key)) 這樣一個元組。

  • 如果超過內存 limit, 我 spill 到一個文件中,這個文件中元素也是有序的,首先是按照 partitionId的排序,如果 partitionId 相同, 再根據 hash(key)進行比較排序

  • 如果需要輸出全局有序的文件的時候,就需要對之前所有的輸出文件 和 當前內存中的數據結構中的數據進行  merge sort, 進行全局排序

和我們開始提的那個問題基本類似,不同的地方在於,需要對 Key 相同的元素進行  aggregation, 就是使用定義的 func 進行聚合, 比如你的算子是 reduceByKey(+), 這個func 就是加法運算, 如果兩個key 相同, 就會先找到所有相同的key 進行 reduce(+) 操作,算出一個總結果 Result,然后輸出數據(K,Result)元素。

SortShuffleWriter 中使用 ExternalSorter 來對內存中的數據進行排序,ExternalSorter內部維護了兩個集合PartitionedAppendOnlyMap、PartitionedPairBuffer, 兩者都是使用了 hash table 數據結構, 如果需要進行 aggregation, 就使用 PartitionedAppendOnlyMap(支持 lookup 某個Key,如果之前存儲過相同key的K-V 元素,就需要進行 aggregation,然后再存入aggregation后的 K-V), 否則使用 PartitionedPairBuffer(只進行添K-V 元素),

我們可以看上圖, PartitionedAppendOnlyMap 中的 K 是(PatitionId, K)的元組, 這樣就是先按照partitionId進行排序,如果 partitionId 相同,再按照  hash(key)再進行排序。

 

首先看下  AppendOnlyMap, 這個很簡單就是個 hash table,其中的 K 是(PatitionId, hash(Key))的元組, 當要 put(K, V) 時,先 hash(K) 找存放位置,如果存放位置已經被占用,就使用 Quadratic probing 探測方法來找下一個空閑位置。對於圖中的 K6 來說,第三次查找找到 K4 后面的空閑位置,放進去即可。get(K6) 的時候類似,找三次找到 K6,取出緊挨着的 V6,與先來的 value 做 func,結果重新放到 V6 的位置。

下面看下  ExternalAppendOnlyMap 結構, 這個就是內存中裝不下所有元素,就涉及到外部排序,

上圖中可以看到整個原理圖,邏輯也很簡單, 內存不夠的時候,先spill了四次,輸出到文件中的元素都是有序的,讀取的時候都是按序讀取,最后跟內存剩余的數據進行 全局merge。

merge 過程就是 每個文件讀取部分數據(StreamBuffer)放到 mergeHeap 里面, 當前內存中的 PartitionedAppendOnlyMap 也進行 sort,形成一個 sortedMap 放在 mergeHeap 里面,  這個 heap 是一個 優先隊列 PriorityQueue, 並且自定義了排序方式,就是取出堆元素StreamBuffer的head元素進行比較大小,

val heap = new mutable.PriorityQueue[Iter]()(new Ordering[Iter] { // Use the reverse of comparator.compare because PriorityQueue dequeues the max
      override def compare(x: Iter, y: Iter): Int = -comparator.compare(x.head._1, y.head._1) })

這樣的話,每次從堆頂的 StreamBuffer 中 pop 出的 head 元素就是全局最小的元素(記住是按照(partitionId,hash(Key))排序的), 如果需要 aggregation, 就把這些key 相同的元素放在一個一個 mergeBuffers 中,  第一個被放入 mergeBuffers 的 StreamBuffer 被稱為 minBuffer,那么 minKey 就是 minBuffer 中第一個 record 的 key。當 merge-combine 的時候,與 minKey 有相同的Key的records 被 aggregate 一起,然后輸出。

如果不需要 aggregation, 那就簡單了, 直接把 堆頂的 StreamBuffer 中 pop 出的 head 元素 返回就好了。

最終讀取的時候,從整個 全局 merge 后的讀取迭代器中讀取的數據,就是按照 partitionId 從小到大排序的數據, 讀取過程中使用再按照 分區分段, 並且記錄每個分區的文件起始寫入位置,把這些位置數據寫入索引文件中。

UnsafeShuffleWriter 實現細節

UnsafeShuffleWriter 里面維護着一個 ShuffleExternalSorter, 用來做外部排序,  外部排序就是要先部分排序數據並把數據輸出到磁盤,然后最后再進行merge 全局排序, 既然這里也是外部排序,跟 SortShuffleWriter 有什么區別呢, 這里只根據 record 的 partition id 先在內存 ShuffleInMemorySorter 中進行排序, 排好序的數據經過序列化壓縮輸出到換一個臨時文件的一段,並且記錄每個分區段的seek位置,方便后續可以單獨讀取每個分區的數據,讀取流經過解壓反序列化,就可以正常讀取了。

整個過程就是不斷地在 ShuffleInMemorySorter 插入數據,如果沒有內存就申請內存,如果申請不到內存就 spill 到文件中,最終合並成一個 依據 partition id 全局有序 的大文件。

SortShuffleWriter 和  UnsafeShuffleWriter 對比

區別 UnsafeShuffleWriter SortShuffleWriter
排序方式 最終只是 partition 級別的排序 先 partition 排序,相同分區 key有序
aggregation 沒有反序列化,沒有aggregation 支持 aggregation

使用 UnsafeShuffleWriter 的條件

  • 沒有指定 aggregation 或者key排序, 因為 key 沒有編碼到排序指針中,所以只有 partition 級別的排序

  • 原始數據首先被序列化處理,並且再也不需要反序列,在其對應的元數據被排序后,需要Serializer支持relocation,在指定位置讀取對應數據。 KryoSerializer 和 spark sql 自定義的序列化器 支持這個特性。

  • 分區數目必須小於 16777216 ,因為 partition number 使用24bit 表示的。

  • 因為每個分區使用 27 位來表示 record offset, 所以一個 record 不能大於這個值。

內存排序並輸出文件

我們不妨看向對記錄排序的例子。一個標准的排序步驟需要為記錄儲存一組的指針,並使用quicksort 來互換指針直到所有記錄被排序。基於順序掃描的特性,排序通常能獲得一個不錯的緩存命中率。然而,排序一組指針的緩存命中率卻很低,因為每個比較運算都需要對兩個指針解引用,而這兩個指針對應的卻是內存中兩個隨機位置的數據。

那么,我們該如何提高排序中的緩存本地性?其中一個方法就是通過指針順序地儲存每個記錄的sort key。我們使用 8個字節(partition id 作為 key, 和數據真正的指針)來代表一條數據,放在一個 sort array 中,每次對比排序的操作只需要線性的查找每對pointer-key,從而不會產生任何的隨機掃描。 這樣如果對所有記錄的 partion 進行排序的時候, 直接對這個數據里面的進行排序,就好了,極大的提高了性能。

當然 這里對數據排序, UnsafeShuffleWriter 使用的是 RadixSort, 這個很簡單,我就不介紹了, 不同清楚的可以參考下 這個文檔 http://bubkoo.com/2014/01/15/sort-algorithm/radix-sort/

上面是申請內存的過程,申請到的內存作為 一個 page 記錄在  allocatedPages 中,spill的時候進行 free 這些內存, 有一個當前使用的 currentPage, 如果不夠用了,就繼續去申請。

大家可以看下上面的圖, 每次插入一條 record 到page 中, 就把 partionId + pageNumber + offset in page, 作為一個元素插入到 LongArray 中, 最終讀取數據的時候, 對LongArray 進行 RadixSort 排序,  排序后依次根據指針元素索引原始數據,就做到 partition 級別有序了。

spill 文件的時候, UnsafeShuffleInMemorySorter 生成一個數據迭代器, 會返回一個根據partition id 排過序迭代器,該迭代器粒度每個元素就是一個指針,對應 PackedRecordPointer 這個數據結構, 這個 PackedRecordPointer 定義的數據結構就是  [24 bit partition number][13 bit memory page number][27 bit offset in page]  然后到根據該指針可以拿到真實的record, 在一開始進入UnsafeShuffleExternalSorter 就已經被序列化了,所以在這里就純粹變成寫字節數組了。一個文件里不同的partiton的數據用fileSegment來表示,對應的信息存在 SpillInfo 數據結構中。

合並文件

每個spill 文件的分區索引都保存在 SpillInfo 數據結構中, Task結束前,我們要做一次mergeSpills操作, 如果 fastMergeEnabled  並且壓縮方式支持 concatenation of compressed data, 就可以直接 簡單地連接相同分區的壓縮數據到一起,而且不用解壓反序列化。使用一種高效的數據拷貝技術,比如  NIO’s transferTo 就可以避免解壓和 buffer 拷貝。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM