1、Shuffle流程
spark的shuffle過程如下圖所示,和mapreduce中的類似,但在spark2.0及之后的版本中只存在SortShuffleManager而將原來的HashShuffleManager廢棄掉(但是shuffleWriter的子類BypassMergeSortShuffleWriter和已經被廢棄掉的HashShuffleWriter類似)。這樣,每個mapTask在shuffle的sort階段只會生成一個結果文件,單個文件按照partitionId分成多個region。reducer階段根據partitionId來fetch對應的region數據。
整個shuffle過程分為兩個階段,write(核心)和read階段,其中write階段比較重要的實現類為ExternalSorter(后面會重點分析該類)。

2、Shuffle Write
- BypassMergeSortShuffleWriter -
這種方式是對partition(對應的reduce)數量較少且不需要map-side aggregation的shuffle優化,將每個partition的數據直接寫到對應的文件,在所有數據都寫入完成后進行一次合並,下面是部分代碼:
[BypassMergeSortShuffleWriter]->write
public void write(Iterator<Product2<K, V>> records) throws IOException { ... partitionWriters = new DiskBlockObjectWriter[numPartitions]; /** 為每個partition創建一個DiskWriter用於寫臨時文件 **/
for (int i = 0; i < numPartitions; i++) { final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile = blockManager.diskBlockManager().createTempShuffleBlock(); final File file = tempShuffleBlockIdPlusFile._2(); final BlockId blockId = tempShuffleBlockIdPlusFile._1(); partitionWriters[i] = blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics); } ... /** 對每個record用對應的writer進行文件寫入操作 **/
while (records.hasNext()) { final Product2<K, V> record = records.next(); final K key = record._1(); partitionWriters[partitioner.getPartition(key)].write(key, record._2()); } //flush
for (DiskBlockObjectWriter writer : partitionWriters) { writer.commitAndClose(); } /** 構造最終的輸出文件實例,其中文件名為(reduceId為0): "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId 文件所在的local文件夾是根據該文件名的hash值確定。 1、如果運行在yarn上,yarn在啟動的時候會根據配置項'LOCAL_DIRS'在本地創建 文件夾 **/ File output = shuffleBlockResolver.getDataFile(shuffleId, mapId); //在實際結果文件名后加上uuid用於標識文件正在寫入,結束后重命名
File tmp = Utils.tempFileWith(output); try { //合並每個partition對應的文件到一個文件中
partitionLengths = writePartitionedFile(tmp); //將每個partition的offset寫入index文件方便reduce端fetch數據
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp); } finally { if (tmp.exists() && !tmp.delete()) { logger.error("Error while deleting temp file {}", tmp.getAbsolutePath()); } } mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths); }
-
UnsafeShuffleWriter(詳見project tungsten)
該writer可將數據序列化后寫入到堆外內存,只需要按照partitionid對地址進行排序,整個過程不涉及反序列化。
條件:
1、使用的序列化類需要支持object relocation.目前只能使用kryoSerializer
2、不需要map side aggregate即不能定義aggregator
3、partition數量不能大於支持的上限(2^24)
內存模型:
每條數據地址由一個64位的指針確定,其構成為:[24 bit partition number][13 bit memory page number][27 bit offset in page]
在內存為非8字節對齊的情況下,每個page的容量為227bits=128Mb,page總數為213,因此每個task可操作內存總量為:227*213bits=1Tb,在內存按字節對齊的情況下允許每個page的size有1g(即128*8,實際64位系統的內存都是8字節對齊的)的容量,數據存放在off heap上。在地址中加入partitionID 是為了排序階段只需要對record的地址排序。
4、Shuffle過程中涉及到的幾個參數
- spark.shuffle.sort.bypassMergeThreshold
當partition的數量小於該值並且不需要進行map-side aggregation時使用BypassMergeSortShuffleWriter來進行shuffle的write操作,默認值為200.
[SortShuffleWriter]->shouldBypassMergeSort
def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = { if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") false } else { val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) dep.partitioner.numPartitions <= bypassMergeThreshold } }``` - *spark.shuffle.compress*、*spark.shuffle.file.buffer*
**[DiskBlockObjectWriter]->open**
def open(): DiskBlockObjectWriter = { ... /** 'spark.shuffle.compress'-該參數決定是否對寫入文件的序列化數據進行壓縮。 'spark.shuffle.file.buffer'-設置buffer stream的buffersize,每write 一個byte時會檢查當前buffer容量,容量滿的時候則會flush到磁盤。該參數值在代碼中 會乘以1024轉換為字節長度。默認值為'32k',該值太大可能導致內存溢出。 **/ bs = compressStream(new BufferedOutputStream(ts, bufferSize)) ... }``` spark.file.transferTo 決定在使用BypassMergeWriter過程中,最后對文件進行合並時是否使用NIO方式進行file stream的copy。默認為true,在為false的情況下合並文件效率比較低(創建一個大小為8192的字節數組作為buffer,從in stream中讀滿后寫入out stream,單線程讀寫),版本號為2.6.32的linux內核在使用NIO方式會產生bug,需要將該參數設置為false。 spark.shuffle.spill.numElementsForceSpillThreshold 在使用UnsafeShuffleWriter時,如果內存中的數據超過這個值則對當前內存數據進行排序並寫入磁盤臨時文件。