Spark 源碼分析 -- task實際執行過程


Spark源碼分析 – SparkContext 中的例子, 只分析到sc.runJob

那么最終是怎么執行的? 通過DAGScheduler切分成Stage, 封裝成taskset, 提交給TaskScheduler, 然后等待調度, 最終到Executor上執行

val sc = new SparkContext(……)
val textFile = sc.textFile("README.md")
textFile.filter(line => line.contains("Spark")).count()

這是一個比較簡單的沒有shuffle的例子, 看看在Executor上是如何被執行的
首先這個job只有一個stage, 所以只會產生resultTask

最關鍵的執行語句,

func(context, rdd.iterator(split, context))

對於這個例子, func就是最終產生結果的count(), 而rdd就是count前最后一個rdd, 即filter產生的rdd

可以看到Spark中rdd的執行, 不是從前往后, 而是從后往前推的, 為什么? 因為需要考慮cache和checkpoint

所以對於stage只會保留最后一個rdd, 其他的rdd通過dep去反推, 這里調用rdd.iterator來讀取最后一個rdd

 

我可以說iterator是spark中最為核心的一個function嗎:-)

  final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }

如果結果被cache在memory或disk中, 則調用cacheManager.getOrCompute來讀取, 否則直接從checkpoint讀或compute
通過CacheManager來完成從cache中讀取數據, 或重新compute數據並且完成cache的過程

private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
  private val loading = new HashSet[String]

  /** Gets or computes an RDD split. Used by RDD.iterator() when an RDD is cached. */
  def getOrCompute[T](rdd: RDD[T], split: Partition, context: TaskContext, storageLevel: StorageLevel)
      : Iterator[T] = {
    val key = "rdd_%d_%d".format(rdd.id, split.index)
    blockManager.get(key) match {  // 從blockManager中獲取cached值
      case Some(cachedValues) =>  // 從blockManager讀到數據, 說明之前cache過, 直接返回即可
        // Partition is in cache, so just return its values
        return cachedValues.asInstanceOf[Iterator[T]]

      case None => // 沒有讀到數據說明沒有cache過,需要重新load(compute或讀cp)
        // Mark the split as loading (unless someone else marks it first)
        loading.synchronized { // 防止多次load相同的rdd, 加鎖
          if (loading.contains(key)) {
            while (loading.contains(key)) {
              try {loading.wait()} catch {case _ : Throwable =>} // 如果已經在loading, 只需要wait
            }
            // See whether someone else has successfully loaded it. The main way this would fail
            // is for the RDD-level cache eviction policy if someone else has loaded the same RDD
            // partition but we didn't want to make space for it. However, that case is unlikely
            // because it's unlikely that two threads would work on the same RDD partition. One
            // downside of the current code is that threads wait serially if this does happen.
            blockManager.get(key) match {
              case Some(values) =>
                return values.asInstanceOf[Iterator[T]]
              case None =>
                logInfo("Whoever was loading " + key + " failed; we'll try it ourselves")
                loading.add(key)
            }
          } else {
            loading.add(key) // 記錄當前key, 開始loading
          }
        }
        try {
          // If we got here, we have to load the split
          logInfo("Computing partition " + split)  // loading的過程,就是讀cp或重新compute
          val computedValues = rdd.computeOrReadCheckpoint(split, context) // compute的結果是iterator, 何處遍歷產生真實數據?
          // Persist the result, so long as the task is not running locally
          if (context.runningLocally) { return computedValues }
          val elements = new ArrayBuffer[Any]
          elements ++= computedValues  // ++會觸發iterator的遍歷產生data放到elements中
          blockManager.put(key, elements, storageLevel, true) // 對新產生的數據經行cache, 調用blockManager.put
          return elements.iterator.asInstanceOf[Iterator[T]]
        } finally {
          loading.synchronized {
            loading.remove(key)
            loading.notifyAll()
          }
        }
    }
  }
}

 

Task執行的結果, 如何傳到DAGScheduler

task執行的結果value, 參考Spark 源碼分析 -- Task
對於ResultTask是計算的值,比如count值,
對於ShuffleTask為MapStatus(blockManager.blockManagerId, compressedSizes), 其中compressedSizes所有shuffle buckets寫到文件中的data size

//TaskRunner
val value = task.run(taskId.toInt)
val result = new TaskResult(value, accumUpdates, task.metrics.getOrElse(null))
context.statusUpdate(taskId, TaskState.FINISHED, serializedResult)  //context,StandaloneExecutorBackend

//StandaloneExecutorBackend.statusUpdate
driver ! StatusUpdate(executorId, taskId, state, data)

//DriverActor.StatusUpdate
scheduler.statusUpdate(taskId, state, data.value)

//ClusterScheduler.statusUpdate
var taskSetToUpdate: Option[TaskSetManager] = None
taskSetToUpdate.get.statusUpdate(tid, state, serializedData)

//ClusterTaskSetManager.statusUpdate
case TaskState.FINISHED =>
  taskFinished(tid, state, serializedData)

//ClusterTaskSetManager.taskFinished
val result = ser.deserialize[TaskResult[_]](serializedData)
result.metrics.resultSize = serializedData.limit()
sched.listener.taskEnded(tasks(index), Success, result.value, result.accumUpdates, info, result.metrics)
  //tasks = taskSet.tasks
  //info為TaskInfo
  class TaskInfo(
    val taskId: Long,
    val index: Int,
    val launchTime: Long,
    val executorId: String,
    val host: String,
    val taskLocality: TaskLocality.TaskLocality) 

//DAGScheduler.taskEnded
  override def taskEnded(
      task: Task[_],
      reason: TaskEndReason,
      result: Any,
      accumUpdates: Map[Long, Any],
      taskInfo: TaskInfo,
      taskMetrics: TaskMetrics) {
    eventQueue.put(CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
  }

//DAGScheduler.processEvent
handleTaskCompletion(completion)

//DAGScheduler.handleTaskCompletion
......


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM