深入理解spark-DAGscheduler源碼分析(上)


 

背景: 前幾天了解了spark了運行架構,spark代碼提交給driver時候會根據rdd生成DAG,那么實際DAG在代碼中是如何生成的呢?

 

首先了解,spark任務中的幾個划分點:

    1.job:job是由rdd的action來划分,每一個action操作是在spark任務執行時是一個job。(action的區分:rdd分為行動操作和轉化操作,因為我們知道rdd是惰性加載的,除非遇到行動操作,前面的所有的轉化操作才會執行,這也就是為什么spark任務由job來划分執行了,區分行動操作和轉化操作最簡單的方法就是看,rdd放回的值,如果返回的是一個rdd則是轉化操作,例如map,如果返回的是一個其他的數據類型則是行動操作,例如count)

   2.stage:根據rdd的寬窄依賴來划分(shuffle來區分),遇到shuffle,則將shuffle之前的窄依賴歸來一個stage;

   3.task:task是由最后的executor執行的最小任務,它最終落到各個executor上,實現分布式執行;

    

    簡單的歸納一下他們的關系:job -> stage -> task (job中有多個stage,stage中有多個task);

  

    spark運行時,一個任務由client提交,再由driver划分邏輯實現圖DAG,最后分配給各個executor上執行task;

    思考:任務是如何分配監聽的?hash分配,隨機分配?

 

   spark在任務拆分的時候,參考下圖:

    

 

    1.先由sparkcontext初始化,創建一個DAGshcheduler,啟動一個監聽器,監聽spark任務,spark拆分的所有任務都會發給這個監聽器;

    2.客戶端這邊,當我們調用action時,則action會向sparkcontext啟動一個runjob,即是將action任務(一個job)提交給DAGshcheduler的監聽器;

    3.接到job的DAGscheduler 會將任務交給handleJobSubmitted 來處理;

    4. 每個job會生成一個resultstage,其余的都是shufflestage,shufflestage是根據rdd的寬依賴來生成的,根據廣度優先遍歷rdd,遇到shufflestage就創建一個新的stage;

    5.形成DAG圖之后,遍歷執行stage列表,根據父子stage順序執行,如果上層未執行完,下層會一直等待;

    6.每個stage會拆分成多個task,交由taskshcheduler來分配,等待executor來執行完一個task后交給下一個task;

 

 

 1.  spark2.0中, 初始化sparksession  builder 中的sparkcontext在初始化的時候會創建一個dagscheduler的變量,

sparkcontext:

_dagScheduler = new DAGScheduler(this)

  

DAGscheduler的構造方法中,會自己創建一個DAGschedulereventprocessloop,並自啟動一個監聽器

DAGscheduler:

class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging {

     private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)

   ……
   eventProcessLoop.start()

}

  

DAGschedulereventprocessloop的父類 EventLoop 中有個線程類Thread 會起一個線程監聽

EventLoop:

override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) => {
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
            }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

  

其中核心就是一個線程只做onReceive操作,父類只是一個抽象類,子類實現這個方法,調用doOnReceive

  /**
   * The main event loop of the DAG scheduler.
   */
  override def onReceive(event: DAGSchedulerEvent): Unit = {
    val timerContext = timer.time()
    try {
      doOnReceive(event)
    } finally {
      timerContext.stop()
    }
  }

  

 

其最終單線程監聽循環執行的就是:

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    // 處理job提交任務
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) // 處理map提交的stage任務 case MapStageSubmitted(jobId, dependency, callSite, listener, properties) => dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties) // 處理map stage 取消的任務 case StageCancelled(stageId) => dagScheduler.handleStageCancellation(stageId) // 處理job 取消的任務 case JobCancelled(jobId) => dagScheduler.handleJobCancellation(jobId) // 處理job 組取消的任務 case JobGroupCancelled(groupId) => dagScheduler.handleJobGroupCancelled(groupId) // 處理所有job 取消的任務 case AllJobsCancelled => dagScheduler.doCancelAllJobs() // 處理executort完成分配的事件 case ExecutorAdded(execId, host) => dagScheduler.handleExecutorAdded(execId, host) // 處理executor對視事件 case ExecutorLost(execId) => dagScheduler.handleExecutorLost(execId, fetchFailed = false) case BeginEvent(task, taskInfo) => dagScheduler.handleBeginEvent(task, taskInfo) case GettingResultEvent(taskInfo) => dagScheduler.handleGetTaskResult(taskInfo) case completion @ CompletionEvent(task, reason, _, _, taskInfo, taskMetrics) => dagScheduler.handleTaskCompletion(completion) // 處理task丟失的事件 case TaskSetFailed(taskSet, reason, exception) => dagScheduler.handleTaskSetFailed(taskSet, reason, exception) // 處理重新提交失敗Stage事件 case ResubmitFailedStages => dagScheduler.resubmitFailedStages() }

  

 

 

 假如執行一個rdd 執行action操作,即是將rdd中由sparkcontext 調用 runjob方法,sparkcontext中的初始化的DAGscheduler來調用 submitjob將這一個event事件加入到

DAGscheduler的執行隊列中,等待線程順序執行

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
   ........   

    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

  

 

至此,一個rdd的執行操作已經進入DAG監聽器的隊列了,下一步由監聽器取按順序取出來doOreceive 按照event的實際類型來執行相應的操作:

如果調用JobSubmitted方法,則調用相對應的handleJobSubmitted。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM