參考詳細探究Spark的shuffle實現, 寫的很清楚, 當前設計的來龍去脈
Hadoop
Hadoop的思路是, 在mapper端每次當memory buffer中的數據快滿的時候, 先將memory中的數據, 按partition進行划分, 然后各自存成小文件, 這樣當buffer不斷的spill的時候, 就會產生大量的小文件
所以Hadoop后面直到reduce之前做的所有的事情其實就是不斷的merge, 基於文件的多路並歸排序, 在map端的將相同partition的merge到一起, 在reduce端, 把從mapper端copy來的數據文件進行merge, 以用於最終的reduce
多路並歸排序, 達到兩個目的
merge, 把相同key的value都放到一個arraylist里面
sort, 最終的結果是按key排序的
這個方案擴展性很好, 面對大數據也沒有問題, 當然問題在效率, 畢竟需要多次進行基於文件的多路並歸排序, 多輪的和磁盤進行數據讀寫……
Spark
Spark的優勢在於效率, 所以沒有做merge sort, 這樣省去多次磁盤讀寫
當然這樣會有擴展性問題, 很難兩全,
因為不能后面再merge, 所以在寫的時候, 需要同時打開corenum * bucketnum個文件, 寫完才能關閉
並且在reduce的時候, 由於之前沒有做merge, 所以必須在內存里面維護所有key的hashmap, 實時的merge和reduce, 詳細參考下面
寫
如何將shuffle數據寫入block, 關鍵看ShuffleMapTask中的邏輯
可用看到使用shuffleBlockManager, Spark從0.8開始將shuffleBlockManager從普通的BlockManager中分離出來, 便於優化
ShuffleMapTask
// Obtain all the block writers for shuffle blocks. val ser = SparkEnv.get.serializerManager.get(dep.serializerClass) shuffle = blockManager.shuffleBlockManager.forShuffle(dep.shuffleId, numOutputSplits, ser) // 創建ShuffleBlocks, 參數是shuffleId和目標partitions數目 buckets = shuffle.acquireWriters(partition) // 生成ShuffleWriterGroup, shuffle目標buckets(對應於partition) // Write the map output to its associated buckets. for (elem <- rdd.iterator(split, taskContext)) { // 從RDD中取出每個elem數據 val pair = elem.asInstanceOf[Product2[Any, Any]] val bucketId = dep.partitioner.getPartition(pair._1) // 根據pair的key進行shuffle, 得到目標bucketid buckets.writers(bucketId).write(pair) // 將pair數據寫入bucket.writer (BlockObjectWriter) } // Commit這些buckets到block, 其他的RDD會從通過shuffleid找到這些block, 並讀取數據 // Commit the writes. Get the size of each bucket block (total block size). var totalBytes = 0L val compressedSizes: Array[Byte] = buckets.writers.map { writer: BlockObjectWriter => writer.commit() writer.close() val size = writer.size() totalBytes += size MapOutputTracker.compressSize(size) }
ShuffleBlockManager
ShuffleBlockManager的核心函數就是forShuffle, 這個函數返回ShuffleBlocks對象
ShuffleBlocks對象的函數acquireWriters, 返回ShuffleWriterGroup, 其中封裝所有partition所對應的BlockObjectWriter
這里的問題是,
由於Spark的調度是基於task的, task其實對應於partition
如果有m個partitions, 而需要shuffle到n個partition上, 其實就是m個mapper task和n個reducer task
當然在spark中不可能所有的mapper task一起運行, task的並行度取決於core number
1. 如果每個mapper task都要產生n個files, 那么最終產生的文件數就是n*m, 文件數過多...
在Spark 0.8.1中已經優化成使用shuffle consolidation, 即多個mapper task公用一個bucket文件, 怎么公用?
取決於並行度, 因為並行的task是無法公用一個bucket文件的, 所以至少會產生corenum * bucketnum個文件, 而后面被執行的task就可以重用前面創建的bucketfile, 而不用重新創建
2. 在打開文件寫的時候, 每個文件的write handler默認需要100KB內存緩存, 所以同時需要corenum * bucketnum * 100kb大小的內存消耗, 這個問題還沒有得到解決
其實就是說spark在shuffle的時候碰到了擴展性問題, 這個問題為什么Hadoop沒有碰到?
因為hadoop可用容忍多次的磁盤讀寫, 多次的文件merge, 所以它可以在每次從buffer spill的時候, 把內容寫到一個新的文件中, 然后后面再去做文件merge
private[spark] class ShuffleWriterGroup(val id: Int, val writers: Array[BlockObjectWriter]) private[spark] trait ShuffleBlocks { def acquireWriters(mapId: Int): ShuffleWriterGroup def releaseWriters(group: ShuffleWriterGroup) } private[spark] class ShuffleBlockManager(blockManager: BlockManager) { def forShuffle(shuffleId: Int, numBuckets: Int, serializer: Serializer): ShuffleBlocks = { new ShuffleBlocks { // Get a group of writers for a map task. override def acquireWriters(mapId: Int): ShuffleWriterGroup = { val bufferSize = System.getProperty("spark.shuffle.file.buffer.kb", "100").toInt * 1024 val writers = Array.tabulate[BlockObjectWriter](numBuckets) { bucketId => // 根據需要shuffle的partition數目創建writers val blockId = ShuffleBlockManager.blockId(shuffleId, bucketId, mapId) // blockid = "shuffle_" + shuffleId + "_" + mapId + "_" + bucketId blockManager.getDiskBlockWriter(blockId, serializer, bufferSize) // 從blockManager得到DiskBlockWriter } new ShuffleWriterGroup(mapId, writers) } override def releaseWriters(group: ShuffleWriterGroup) = { // Nothing really to release here. } } } }
讀
PairRDDFunctions.combineByKey
關於這部分參考, Spark源碼分析 – PairRDD
關鍵的一點是, 在reduce端的處理中 (可以看沒有mapSideCombine的部分, 更清晰一些)
mapPartitions其實是使用的MapPartitionsRDD, 即對於每個item調用aggregator.combineValuesByKey
可以看到這里和Hadoop最大的不同是, Hadoop在reduce時得到的是一個key已經merge好的集合, 所以一次性reduce處理完后, 就可以直接存掉了
而Spark沒有merge這塊, 所以數據是一個個來的, 所以你必須在內存里面維持所有的key的hashmap, 這里就可能有擴展性問題, Spark在PR303中實現外部排序的方案來應對這樣的問題
//RDD本身的partitioner和傳入的partitioner相等時, 即不需要重新shuffle, 直接map即可 if (self.partitioner == Some(partitioner)) { self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) //2. mapPartitions, map端直接調用combineValuesByKey } else if (mapSideCombine) { //如果需要mapSideCombine val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) //先在partition內部做mapSideCombine val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializerClass) //3. ShuffledRDD, 進行shuffle partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true) //Shuffle完后, 在reduce端再做一次combine, 使用combineCombinersByKey
} else { // Don't apply map-side combiner.和上面的區別就是不做mapSideCombine val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) }
ShuffledRDD
override def compute(split: Partition, context: TaskContext): Iterator[P] = {
val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId
SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context.taskMetrics, //使用shuffleFetcher.fetch得到shuffle過數據的iterator
SparkEnv.get.serializerManager.get(serializerClass))
}
ShuffleFetcher
從mapOutputTracker查詢到(根據shuffleId, reduceId)需要讀取的shuffle partition的地址
然后從blockManager獲取所有這寫block的fetcher的iterator
private[spark] abstract class ShuffleFetcher { /** * Fetch the shuffle outputs for a given ShuffleDependency. * @return An iterator over the elements of the fetched shuffle outputs. */ def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics, // reduceId, 就是reduce端的partitionid serializer: Serializer = SparkEnv.get.serializerManager.default): Iterator[T] /** Stop the fetcher */ def stop() {} }
private[spark] class BlockStoreShuffleFetcher extends ShuffleFetcher with Logging { override def fetch[T](shuffleId: Int, reduceId: Int, metrics: TaskMetrics, serializer: Serializer) : Iterator[T] = { val blockManager = SparkEnv.get.blockManager
val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId) // 從mapOutputTracker獲取shuffleid的Array[MapStatus]
val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]] // 由於有多個map在同一個node上, 有相同的BlockManagerId, 需要合並 for (((address, size), index) <- statuses.zipWithIndex) { // 這里index指,在map端的partitionid splitsByAddress.getOrElseUpdate(address, ArrayBuffer()) += ((index, size)) // {BlockManagerId,((mappartitionid, size),…)} } val blocksByAddress: Seq[(BlockManagerId, Seq[(String, Long)])] = splitsByAddress.toSeq.map { // (BlockManagerId, (blockfile地址, size)) case (address, splits) => (address, splits.map(s => ("shuffle_%d_%d_%d".format(shuffleId, s._1, reduceId), s._2))) // 可以看到blockfile地址,由shuffleId, mappartitionid, reduceId決定 }
val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer) // Iterator of (block ID, value) val itr = blockFetcherItr.flatMap(unpackBlock) // unpackBlock會拆開(block ID, value)取出value, 以生成最終獲取到數據的iterater CompletionIterator[T, Iterator[T]](itr, { // 和普通Iterator的區別是,迭代完時, 會調用后面的completion邏輯 val shuffleMetrics = new ShuffleReadMetrics shuffleMetrics.shuffleFinishTime = System.currentTimeMillis shuffleMetrics.remoteFetchTime = blockFetcherItr.remoteFetchTime shuffleMetrics.fetchWaitTime = blockFetcherItr.fetchWaitTime shuffleMetrics.remoteBytesRead = blockFetcherItr.remoteBytesRead shuffleMetrics.totalBlocksFetched = blockFetcherItr.totalBlocks shuffleMetrics.localBlocksFetched = blockFetcherItr.numLocalBlocks shuffleMetrics.remoteBlocksFetched = blockFetcherItr.numRemoteBlocks metrics.shuffleReadMetrics = Some(shuffleMetrics) }) }
private def convertMapStatuses( shuffleId: Int, reduceId: Int, statuses: Array[MapStatus]): Array[(BlockManagerId, Long)] = { assert (statuses != null) statuses.map { status => if (status == null) { throw new FetchFailedException(null, shuffleId, -1, reduceId, new Exception("Missing an output location for shuffle " + shuffleId)) } else { (status.location, decompressSize(status.compressedSizes(reduceId))) // 關鍵轉化就是, 將decompressSize只取該reduce partition的部分 } } } }
Shuffle信息注冊 - MapOutputTracker
前面有個問題沒有說清楚, 當shuffle完成后, reducer端的task怎么知道應該從哪里獲取當前partition所需要的所有shuffled blocks
在Hadoop中是通過JobTracker, Mapper會通過Hb告訴JobTracker執行的狀況, Reducer不斷的去詢問JobTracker, 並知道需要copy哪些HDFS文件
而在Spark中就通過將shuffle信息注冊到MapOutputTracker
MapOutputTracker
首先每個節點都可能需要查詢shuffle信息, 所以需要MapOutputTrackerActor用於通信
參考SparkContext中的邏輯, 只有在master上才創建Actor對象, 其他slaver上只是創建Actor Ref
private[spark] class MapOutputTrackerActor(tracker: MapOutputTracker) extends Actor with Logging { def receive = { case GetMapOutputStatuses(shuffleId: Int, requester: String) => // 提高用於查詢shuffle信息的接口 logInfo("Asked to send map output locations for shuffle " + shuffleId + " to " + requester) sender ! tracker.getSerializedLocations(shuffleId) case StopMapOutputTracker => logInfo("MapOutputTrackerActor stopped!") sender ! true context.stop(self) } }
注意, 只有master上的MapOutputTracker會有所有的最新shuffle信息
但是對於slave, 出於效率考慮, 也會buffer從master得到的shuffle信息, 所以getServerStatuses中會先在local的mapStatuses取數據, 如果沒有, 再取remote的master上獲取
private[spark] class MapOutputTracker extends Logging { var trackerActor: ActorRef = _ // MapOutputTrackerActor private var mapStatuses = new TimeStampedHashMap[Int, Array[MapStatus]] // 用於buffer所有的shuffle信息
def registerShuffle(shuffleId: Int, numMaps: Int) { // 注冊shuffle id, 初始化Array[MapStatus] if (mapStatuses.putIfAbsent(shuffleId, new Array[MapStatus](numMaps)).isDefined) { throw new IllegalArgumentException("Shuffle ID " + shuffleId + " registered twice") } } def registerMapOutput(shuffleId: Int, mapId: Int, status: MapStatus) { // 當task完成時, 注冊MapOutput信息 var array = mapStatuses(shuffleId) array.synchronized { array(mapId) = status } }
// Remembers which map output locations are currently being fetched on a worker private val fetching = new HashSet[Int]
// Called on possibly remote nodes to get the server URIs and output sizes for a given shuffle def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)] = { val statuses = mapStatuses.get(shuffleId).orNull if (statuses == null) { // local的mapStatuses中沒有 logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them") var fetchedStatuses: Array[MapStatus] = null fetching.synchronized { if (fetching.contains(shuffleId)) { // 已經在fetching中, 所以只需要wait // Someone else is fetching it; wait for them to be done while (fetching.contains(shuffleId)) { try { fetching.wait() } catch { case e: InterruptedException => } } } // Either while we waited the fetch happened successfully, or // someone fetched it in between the get and the fetching.synchronized. fetchedStatuses = mapStatuses.get(shuffleId).orNull if (fetchedStatuses == null) { // We have to do the fetch, get others to wait for us. fetching += shuffleId // 如果還沒有就加到fetching, 繼續fetching } } if (fetchedStatuses == null) { // We won the race to fetch the output locs; do so val hostPort = Utils.localHostPort() // This try-finally prevents hangs due to timeouts: try { val fetchedBytes = askTracker(GetMapOutputStatuses(shuffleId, hostPort)).asInstanceOf[Array[Byte]] // 從remote master上fetching fetchedStatuses = deserializeStatuses(fetchedBytes) logInfo("Got the output locations") mapStatuses.put(shuffleId, fetchedStatuses) // 把結果buffer到local } finally { fetching.synchronized { fetching -= shuffleId fetching.notifyAll() } } } if (fetchedStatuses != null) { fetchedStatuses.synchronized { return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, fetchedStatuses) } } else{ throw new FetchFailedException(null, shuffleId, -1, reduceId, new Exception("Missing all output locations for shuffle " + shuffleId)) } } else { // 在local找到, 直接返回 statuses.synchronized { return MapOutputTracker.convertMapStatuses(shuffleId, reduceId, statuses) } } }
注冊
注冊工作都是在master上的DAGScheduler完成的
Spark中是以shuffleid來標識每個shuffle, 不同於Hadoop, 一個job中可能有多個shuffle過程, 所以無法通過jobid
分兩步來注冊,
1. 在new stage的時候, 需要注冊shuffleid, 由於new stage一定是由於遇到shuffleDep
private def newStage( rdd: RDD[_], shuffleDep: Option[ShuffleDependency[_,_]], jobId: Int, callSite: Option[String] = None) : Stage = { if (shuffleDep != None) {
// Kind of ugly: need to register RDDs with the cache and map output tracker here // since we can't do it in the RDD constructor because # of partitions is unknown mapOutputTracker.registerShuffle(shuffleDep.get.shuffleId, rdd.partitions.size) // 注冊shuffleid和map端RDD的partitions數目 }
2. 在handle TaskCompletion事件的時候, 當一個ShuffleMapTask完成的時候, 即mapOutput產生的時候, 就可以注冊MapStatus(BlockManagerId, compressedSizes)
通過BlockManagerId+partitionid+reduceid就可以知道blockid, 從而讀到數據
private def handleTaskCompletion(event: CompletionEvent) { event.reason match { case Success => task match { case rt: ResultTask[_, _] => case smt: ShuffleMapTask =>
val status = event.result.asInstanceOf[MapStatus] // 在ShuffleTask的run的返回值本身就是MapStatus, 所以這里做下類型轉換 val execId = status.location.executorId // class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte]) if (failedEpoch.contains(execId) && smt.epoch <= failedEpoch(execId)) { logInfo("Ignoring possibly bogus ShuffleMapTask completion from " + execId) } else { stage.addOutputLoc(smt.partition, status) // 把MapStatus buffer到stage中outputLocs上去 } if (stage.shuffleDep != None) { // We supply true to increment the epoch number here in case this is a // recomputation of the map outputs. In that case, some nodes may have cached // locations with holes (from when we detected the error) and will need the // epoch incremented to refetch them. // TODO: Only increment the epoch number if this is not the first time // we registered these map outputs. mapOutputTracker.registerMapOutputs( // 注冊到mapOutputTracker中的mapStatuses上 stage.shuffleDep.get.shuffleId, stage.outputLocs.map(list => if (list.isEmpty) null else list.head).toArray, changeEpoch = true) }