Spark Shuffle Write階段磁盤文件分析


這篇文章會詳細介紹,Sort Based Shuffle Write 階段是如何進行落磁盤的

流程分析

入口處:

org.apache.spark.scheduler.ShuffleMapTask.runTask

runTask對應的代碼為:

val manager = SparkEnv.get.shuffleManager writer = manager.getWriter[Any, Any]( dep.shuffleHandle, partitionId, context) writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get

這里manager 拿到的是

org.apache.spark.shuffle.sort.SortShuffleWriter

我們看他是如何拿到可以寫磁盤的那個sorter的。我們分析的線路假設需要做mapSideCombine

sorter = if (dep.mapSideCombine) { require(dep.aggregator.isDefined, "Map-side combine without Aggregator specified!") new ExternalSorter[K, V, C]( dep.aggregator, Some(dep.partitioner), dep.keyOrdering, de.serializer)

接着將map的輸出放到sorter當中:

sorter.insertAll(records)

其中insertAll 的流程是這樣的:

while (records.hasNext) { addElementsRead() kv = records.next() map.changeValue((getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true)}

里面的map 其實就是PartitionedAppendOnlyMap,這個是全內存的一個結構。當把這個寫滿了,才會觸發spill操作。你可以看到maybeSpillCollection在PartitionedAppendOnlyMap每次更新后都會被調用。

一旦發生spill后,產生的文件名稱是:

    "temp_shuffle_" + id

邏輯在這:

val (blockId, file) = diskBlockManager.createTempShuffleBlock() def createTempShuffleBlock(): (TempShuffleBlockId, File) = { var blockId = new TempShuffleBlockId(UUID.randomUUID()) while (getFile(blockId).exists()) { blockId = new TempShuffleBlockId(UUID.randomUUID()) } (blockId, getFile(blockId)) }

產生的所有 spill文件被被記錄在一個數組里:

  private val spills = new ArrayBuffer[SpilledFile]

迭代完一個task對應的partition數據后,會做merge操作,把磁盤上的spill文件和內存的,迭代處理,得到一個新的iterator,這個iterator的元素會是這個樣子的:

(p, mergeWithAggregation( iterators, aggregator.get.mergeCombiners, keyComparator, ordering.isDefined))

其中p 是reduce 對應的partitionId, p對應的所有數據都會在其對應的iterator中。

接着會獲得最后的輸出文件名:

val outputFile = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

文件名格式會是這樣的:

 "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"

其中reduceId 是一個固定值NOOP_REDUCE_ID,默認為0。

然后開始真實寫入文件

  val partitionLengths = sorter.writePartitionedFile( blockId, context, outputFile)

寫入文件的過程過程是這樣的:

for ((id, elements) <- this.partitionedIterator) { if (elements.hasNext) { val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize, context.taskMetrics.shuffleWriteMetrics.get) for (elem <- elements) { writer.write(elem._1, elem._2) } writer.commitAndClose() val segment = writer.fileSegment() lengths(id) = segment.length } }
剛剛我們說了,這個 this.partitionedIterator 其實內部元素是reduce partitionID -> 實際record 的 iterator,所以它其實是順序寫每個分區的記錄,寫完形成一個fileSegment,並且記錄偏移量。這樣后續每個的reduce就可以根據偏移量拿到自己需要的數據。對應的文件名,前面也提到了,是:
"shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".data"

剛剛我們說偏移量,其實是存在內存里的,所以接着要持久化,通過下面的writeIndexFile來完成:

shuffleBlockResolver.writeIndexFile(
           dep.shuffleId,
           mapId, 
          partitionLengths)

具體的文件名是:

  "shuffle_" + shuffleId + "_" + mapId + "_" + NOOP_REDUCE_ID + ".index"

至此,一個task的寫入操作完成,對應一個文件。

最終結論

所以最后的結論是,一個Executor 最終對應的文件數應該是:

MapNum (注:不包含index文件)

同時持有並且會進行寫入的文件數最多為::

 CoreNum

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM