Apache Spark源碼走讀之20 -- ShuffleMapTask計算結果的保存與讀取


歡迎轉載,轉載請注明出處,徽滬一郎。

概要

ShuffleMapTask的計算結果保存在哪,隨后Stage中的task又是如何知道從哪里去讀取的呢,這個過程一直讓我困惑不已。

用比較通俗一點的說法來解釋一下Shuffle數據的寫入和讀取過程

  1. 每一個task負責處理一個特定的data partition
  2. task在初始化的時候就已經明確處理結果可能會產生多少個不同的data partition
  3. 利用partitioner函數,task將處理結果存入到不同的partition,這些數據存放在當前task執行的機器上
  4. 假設當前是stage 2有兩個task, stage 2可能輸出4個不同的data partition, task 0和task 1各自運行於不同的機器上,task 0中的部分處理結果會存入到data partition 0,task 1的部分處理結果也可能存入到data partition 0.
  5. 由於stage 2產生了4個不同的data partition, 后續stage 1中的task個數就為4. task 0 就負責讀取data partition 0的數據,對於(stage1, task0)來說,所要讀取的data partition 0的內容由task 0和task 1中的partition 0共同組成。
  6. 現在問題的關鍵轉換成為(stage_1, task_0)如何知道(stage_2, task_x)有沒有相應的輸出是屬於data partition 0的呢?這個問題的解決就是MapStatus
  7. 每一個ShuffleMapTask在執行結束,都會上報一個MapStatus,在MapStatus中會反應出朝哪些data partition寫入了數據,寫入了數據則size為非零值,否則為零值
  8. (stage_1,task_0)會去獲取stage_2中所有task的MapStatus,以判定(stage_2, task_x)產生的數據中有自己需要讀入的內容
  9. 假設(stage_1,task_0)知道(stage_2, task_0)生成了data partition 0中的數據,於是去(stage_2, task_0)運行時的機器去獲取具體的數據,如果恰巧這個時候遠端機器已經掛掉了,獲取失敗,怎么辦?
  10. 上報異常,由DAGScheduler重新調度(stage_2,task_0),重新生成所需要的數據。
  11. Spark不像Hadoop中的MapReduce有一個明顯的combine階段,在spark中combine過程有兩次調用,一是Shuffle數據寫入過程,另一個是Shuffle數據讀取過程。

如果能夠明白上述的過程,並對應到相應的代碼,那就無須看下述的詳細解釋了。

好了,讓我們開始代碼跟蹤吧。

數據寫入過程

數據寫入動作最原始的觸發點是ShuffleMapTask.runTask函數,看一看源碼先。

  override def runTask(context: TaskContext): MapStatus = {
    metrics = Some(context.taskMetrics)
    var writer: ShuffleWriter[Any, Any] = null
    try {
      val manager = SparkEnv.get.shuffleManager
      writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
      writer.write(rdd.iterator(split, context).asInstanceOf[Iterator[_ 
        if (writer != null) {
          writer.stop(success = false)
        }
        throw e
    } finally {
      context.executeOnCompleteCallbacks()
    }
  }

managerGetWriter返回的是HashShuffleWriter,所以調用過程是ShuffleMapTask.runTask->HashShuffleWriter.write->BlockObjectWriter.write. 注意dep.mapSideCombine這一分支判斷。ReduceByKey(_ + _)中的(_ + _)在此處被執行一次,另一次執行是在read過程。

  override def write(records: Iterator[_ <: Product2[K, V]]): Unit = {
    val iter = if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        dep.aggregator.get.combineValuesByKey(records, context)
      } else {
        records
      }
    } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
      throw new IllegalStateException("Aggregator is empty for map-side combine")
    } else {
      records
    }

    for (elem <- iter) {
      val bucketId = dep.partitioner.getPartition(elem._1)
      shuffle.writers(bucketId).write(elem)
    }

HashShuffleWriter.write中主要處理兩件事

  1. 判斷是否需要進行聚合,比如<hello,1>和<hello,1>都要寫入的話,那么先生成<hello,2>然后再進行后續的寫入工作
  2. 利用Partitioner函數來決定<k,val>寫入到哪一個文件中

Partitioner是在什么時候注入的,RDD抽象類中,Partitioner為空?以reduceByKey為例,HashPartitioner會在后面combineByKey的代碼創建ShuffledRDD的時候作為ShuffledRDD的構造函數傳入。

  def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = {
    reduceByKey(new HashPartitioner(numPartitions), func)
  }

Stage在創建的時候通過構造函數入參明確需要從多少Partition讀取數據,生成的Partition會有多少。看一看Stage的構造函數,讀取的分區數目由RDD.partitions.size決定,輸出的partitions由shuffleDep決定。

private[spark] class Stage(
    val id: Int,
    val rdd: RDD[_],
    val numTasks: Int,
    val shuffleDep: Option[ShuffleDependency[_, _, _]],  // Output shuffle if stage is a map stage
    val parents: List[Stage],
    val jobId: Int,
    val callSite: CallSite)
extends Logging {
  val isShuffleMap = shuffleDep.isDefined
  val numPartitions = rdd.partitions.size
  val outputLocs = Array.fill[List[MapStatus]](numPartitions)(Nil)
  var numAvailableOutputs = 0
  private var nextAttemptId = 0

回到數據寫入的問題上來,結果寫入時的一個主要問題就是已經知道shuffle_id, map_id和要寫入的elem,如何找到對應的寫入文件。每一個臨時文件由三元組(shuffle_id,map_id,reduce_id)來決定,當前已經知道了兩個,還剩下一下reduce_id待確定。

reduce_id是使用partitioner計算出來的結果,輸入的是elem的鍵值。也就是dep.partitioner.getPartition(elem._1)。 根據計算出來的bucketid找到對應的writer,然后真正寫入。

在HashShuffleWriter.write中使用到的shuffle由ShuffleBlockManager的forMapTask函數生成,注意forMapTask中產生writers的代碼邏輯。

每個writer分配一下文件, 文件名由三元組(shuffle_id,map_id,reduce_id)組成,如果知道了這個三元組就可以找到對應的文件。

如果consolidation沒有打開,那么在一個task中,有多少個輸出的partition就會有多少個中間文件。

      val writers: Array[BlockObjectWriter] = if (consolidateShuffleFiles) {
        fileGroup = getUnusedFileGroup()
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          blockManager.getDiskWriter(blockId, fileGroup(bucketId), serializer, bufferSize)
        }
      } else {
        Array.tabulate[BlockObjectWriter](numBuckets) { bucketId =>
          val blockId = ShuffleBlockId(shuffleId, mapId, bucketId)
          val blockFile = blockManager.diskBlockManager.getFile(blockId)
          // Because of previous failures, the shuffle file may already exist on this machine.
          // If so, remove it.
          if (blockFile.exists) {
            if (blockFile.delete()) {
              logInfo(s"Removed existing shuffle file $blockFile")
            } else {
              logWarning(s"Failed to remove existing shuffle file $blockFile")
            }
          }
          blockManager.getDiskWriter(blockId, blockFile, serializer, bufferSize)
        }
      }

getFile負責將三元組(shuffle_id,map_id,reduce_id)映射到文件名

def getFile(filename: String): File = {
    // Figure out which local directory it hashes to, and which subdirectory in that
    val hash = Utils.nonNegativeHash(filename)
    val dirId = hash % localDirs.length
    val subDirId = (hash / localDirs.length) % subDirsPerLocalDir

    // Create the subdirectory if it doesn't already exist
    var subDir = subDirs(dirId)(subDirId)
    if (subDir == null) {
      subDir = subDirs(dirId).synchronized {
        val old = subDirs(dirId)(subDirId)
        if (old != null) {
          old
        } else {
          val newDir = new File(localDirs(dirId), "%02x".format(subDirId))
          newDir.mkdir()
          subDirs(dirId)(subDirId) = newDir
          newDir
        }
      }
    }

    new File(subDir, filename)
  }

  def getFile(blockId: BlockId): File = getFile(blockId.name)

產生的文件在哪呢,如果沒有更改默認的配置,生成的目錄結構類似於下

/tmp/spark-local-20140723092540-7f24
/tmp/spark-local-20140723092540-7f24/0d
/tmp/spark-local-20140723092540-7f24/0d/shuffle_0_0_1
/tmp/spark-local-20140723092540-7f24/0d/shuffle_0_1_0
/tmp/spark-local-20140723092540-7f24/0c
/tmp/spark-local-20140723092540-7f24/0c/shuffle_0_0_0
/tmp/spark-local-20140723092540-7f24/0e
/tmp/spark-local-20140723092540-7f24/0e/shuffle_0_1_1 

當所有的數據寫入文件並提交以后,還需要生成MapStatus匯報給driver application. MapStatus在哪生成的呢?commitWritesAndBuildStatus就干這活。

調用關系HashShuffleWriter.stop->commitWritesAndBuildStatus

private def commitWritesAndBuildStatus(): MapStatus = {
    // Commit the writes. Get the size of each bucket block (total block size).
    var totalBytes = 0L
    var totalTime = 0L
    val compressedSizes = shuffle.writers.map { writer: BlockObjectWriter =>
      writer.commit()
      writer.close()
      val size = writer.fileSegment().length
      totalBytes += size
      totalTime += writer.timeWriting()
      MapOutputTracker.compressSize(size)
    }

    // Update shuffle metrics.
    val shuffleMetrics = new ShuffleWriteMetrics
    shuffleMetrics.shuffleBytesWritten = totalBytes
    shuffleMetrics.shuffleWriteTime = totalTime
    metrics.shuffleWriteMetrics = Some(shuffleMetrics)

    new MapStatus(blockManager.blockManagerId, compressedSizes)
  }

compressedSize是一個非常讓人疑惑的地方,原因慢慢道來,先看一下MapStatus的構造函數

class MapStatus(var location: BlockManagerId, var compressedSizes: Array[Byte])

 compressedSize是一個byte數組,每一個byte反應了該partiton中的數據大小。如Array(0)=128就表示在data partition 0中有128byte數據。

問題的問題是一個byte只能表示255,如果超過255怎么辦呢?

當當當,數學閃亮登場了,注意到compressSize沒,通過轉換將2^8變換為1.1^256。一下子由255byte延伸到近35G.

看一看這神奇的compressSize函數吧,只是聊聊幾行代碼而已。

  def compressSize(size: Long): Byte = {
    if (size == 0) {
      0
    } else if (size <= 1L) {
      1
    } else {
      math.min(255, math.ceil(math.log(size) / math.log(LOG_BASE)).toInt).toByte
    }
  }

 ShuffleMapTask運行結束時,會將MapStatus結果封裝在StatusUpdate消息中匯報給SchedulerBackend, 由DAGScheduler在handleTaskCompletion函數中將MapStatus加入到相應的Stage。這一過程略過,不再詳述。

MapOutputTrackerMaster會保存所有最新的MapStatus.

只畫張圖來表示存儲之后的示意。

 

數據讀取過程

ShuffledRDD.compute函數是讀取過程的觸發點。

  override def compute(split: Partition, context: TaskContext): Iterator[P] = {
    val dep = dependencies.head.asInstanceOf[ShuffleDependency[K, V, C]]
    SparkEnv.get.shuffleManager.getReader(dep.shuffleHandle, split.index, split.index + 1, context)
      .read()
      .asInstanceOf[Iterator[P]]
  }

shuffleManager.getReader返回的是HashShuffleReader,所以看一看HashShuffleReader中的read函數的具體實現。

read函數處理邏輯中需要注意到一點即combine過程有可能會被再次執行。注意dep.aggregator.isDefined這一分支判斷。ReduceByKey(_ + _)中的(_ + _)在此處被執行。

override def read(): Iterator[Product2[K, C]] = {
    val iter = BlockStoreShuffleFetcher.fetch(handle.shuffleId, startPartition, context,
      Serializer.getSerializer(dep.serializer))

    if (dep.aggregator.isDefined) {
      if (dep.mapSideCombine) {
        new InterruptibleIterator(context, dep.aggregator.get.combineCombinersByKey(iter, context))
      } else {
        new InterruptibleIterator(context, dep.aggregator.get.combineValuesByKey(iter, context))
      }
    } else if (dep.aggregator.isEmpty && dep.mapSideCombine) {
      throw new IllegalStateException("Aggregator is empty for map-side combine")
    } else {
      iter
    }
  }

一路輾轉,終於來到了讀取過程中非常關鍵的所在BlockStoreShuffleFetcher。

BlockStoreShuffleFetcher需要回答如下問題

  1. 所要獲取的mapid的mapstatus的內容是什么
  2. 根據獲得的mapstatus去相應的blockmanager獲取具體的數據
 val blockManager = SparkEnv.get.blockManager

  val startTime = System.currentTimeMillis
  val statuses = SparkEnv.get.mapOutputTracker.getServerStatuses(shuffleId, reduceId)
  logDebug("Fetching map output location for shuffle %d, reduce %d took %d ms".format(
  shuffleId, reduceId, System.currentTimeMillis - startTime))

  val splitsByAddress = new HashMap[BlockManagerId, ArrayBuffer[(Int, Long)]]
  for (((address, size), index) 
    (address, splits.map(s => (ShuffleBlockId(shuffleId, s._1, reduceId), s._2)))
  }
  val blockFetcherItr = blockManager.getMultiple(blocksByAddress, serializer)
  val itr = blockFetcherItr.flatMap(unpackBlock)

一個ShuffleMapTask會生成一個MapStatus,MapStatus中含有當前ShuffleMapTask產生的數據落到各個Partition中的大小。如果大小為0,則表示該分區沒有數據產生。MapStatus中另一個重要的成員變量就是BlockManagerId,該變量表示目標數據在哪個BlockManager當中。

MapoutputTrackerMaster擁有最新的MapStatus信息,為了執行效率,MapoutputTrackerWorker會定期更新數據到本地,所以MapoutputTracker先從本地查找,如果找不到再從MapoutputTrackerMaster上同步最新數據。

索引即是reduceId,如果array(0) == 0,就表示上一個ShuffleMapTask中生成的數據中沒有任意的內容可以作為reduceId為0的ResultTask的輸入。如果不能理解,返回仔細看一下MapStatus的結構圖。

BlockManager.getMultiple用於讀取BlockManager中的數據,根據配置確定生成tNettyBlockFetcherIterator還是BasicBlockFetcherIterator。

如果所要獲取的文件落在本地,則調用getLocal讀取,否則發送請求到遠端blockmanager。看一下BlockFetcherIterator的initialize函數

    override def initialize() {
      // Split local and remote blocks.
      val remoteRequests = splitLocalRemoteBlocks()
      // Add the remote requests into our queue in a random order
      fetchRequests ++= Utils.randomize(remoteRequests)

      // Send out initial requests for blocks, up to our maxBytesInFlight
      while (!fetchRequests.isEmpty &&
        (bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
        sendRequest(fetchRequests.dequeue())
      }

      val numFetches = remoteRequests.size - fetchRequests.size
      logInfo("Started " + numFetches + " remote fetches in" + Utils.getUsedTimeMs(startTime))

      // Get Local Blocks
      startTime = System.currentTimeMillis
      getLocalBlocks()
      logDebug("Got local blocks in " + Utils.getUsedTimeMs(startTime) + " ms")
}

至此,數據讀取的正常流程講述完畢。

數據讀取異常

如果數據讀取中碰到異常怎么辦?比如,

  1. 已知(stage_2,task_0)產生的parition_0的數據在機器m1, 當前任務在m2執行,於是從m2向m1發起遠程獲取請求,如果m2中擁有目標數據的JVM進程異常退出,則相應的目標數據無法獲取。

如果無法獲取目標數據,就會上報FetchFailedException.

    def unpackBlock(blockPair: (BlockId, Option[Iterator[Any]])) : Iterator[T] = {
      val blockId = blockPair._1
      val blockOption = blockPair._2
      blockOption match {
        case Some(block) => {
          block.asInstanceOf[Iterator[T]]
        }
        case None => {
          blockId match {
            case ShuffleBlockId(shufId, mapId, _) =>
              val address = statuses(mapId.toInt)._1
              throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId)
            case _ =>
              throw new SparkException(
                "Failed to get block " + blockId + ", which is not a shuffle block")
          }
        }
      }
    }

 FetchFailedExecption會被包裝在StatutsUpdate上報給SchedulerBackend,然后一路處理下去,最終將丟失目標數據的歸屬Task重新提交。比如當前是(stage_1, task_0),需要讀取(stage_2, task_1)產生的目標數據,但是對應的目標數據丟失,這個時候就需要將(stage_2, task_1)重新提交運行。

注意DAGScheduler中的FetchFailed處理分支,一路跟蹤下去就會看到任務被重新提交了

  case FetchFailed(bmAddress, shuffleId, mapId, reduceId) =>
        // Mark the stage that the reducer was in as unrunnable
        val failedStage = stageIdToStage(task.stageId)
        runningStages -= failedStage
        // TODO: Cancel running tasks in the stage
        logInfo("Marking " + failedStage + " (" + failedStage.name +
          ") for resubmision due to a fetch failure")
        // Mark the map whose fetch failed as broken in the map stage
        val mapStage = shuffleToMapStage(shuffleId)
        if (mapId != -1) {
          mapStage.removeOutputLoc(mapId, bmAddress)
          mapOutputTracker.unregisterMapOutput(shuffleId, mapId, bmAddress)
        }
        logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
          "); marking it for resubmission")
        if (failedStages.isEmpty && eventProcessActor != null) {
          // Don't schedule an event to resubmit failed stages if failed isn't empty, because
          // in that case the event will already have been scheduled. eventProcessActor may be
          // null during unit tests.
          import env.actorSystem.dispatcher
          env.actorSystem.scheduler.scheduleOnce(
            RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
        }
        failedStages += failedStage
        failedStages += mapStage
        // TODO: mark the executor as failed only if there were lots of fetch failures on it
        if (bmAddress != null) {
          handleExecutorLost(bmAddress.executorId, Some(task.epoch))
        }

文件清除

生成的中間數據是在什么時候被清除的呢?

當Driver Application退出的時候,該Application生成的臨時文件將會被一一清除,注意是application結束生命,不是job。一個application可以包含一至多個job。

實驗

以local-cluster方式運行spark-shell,觀察/tmp/spark-local*目錄下的文件變化,具體指令如下

MASTER=local-cluster[2,2,512] bin/spark-shell
#進入spark-shell之后,輸入
sc.textFile("README.md").flatMap(_.split(" ")).map(w=>(w,1)).reduceByKey(_ + _)

小結

Shuffle數據的寫入和讀取是Spark Core這一部分最為復雜的內容,徹底了解該部分內容才能深刻意識到Spark實現的精髓所在。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM