深入理解spark-taskScheduler,schedulerBackend源碼分析


 

上次分析了dagshceduler是如何將任務拆分成job,stage,task的,但是拆分后的僅僅是一個邏輯結果,保存為一個resultstage對象,並沒執行;

而將任務正在執行的是spark的taskscheduler模塊和shcedulerbackend模塊,

taskcheduler模塊負責task的調度,schedulerbackend負責task的自願申請,這兩個結合比價緊密,實現也是在一起實現的;

 

點開sparkcontext的內部屬性,可以看到taskscheduler的的對象(org.apache.spark.scheduler.TaskScheduler)是一個trait(Scala的叫法,簡單的理解為類似於java的interface),這是因為task的提交方式有多種,可以是yarn-client模式,也可以是yarn-cluster模型,這取決於提交spark提交時候設置的參數master。

master設置不同,最終實現的也不同,當是yarn-client模式的時候,task實現方式則是yarnscheduler。

同樣的schedulerbackend也是一個trait,具體的實現也是根據spark.master來決定,如果是yarn-client模式,實現則是yarnclientschedulerbackend。

 

具體看一下代碼實現:

SparkContext#createTaskScheduler

sparkcontext中調用createtaskscheduler,根據master來決定生成的實際類型,taskscheduler,schedulerbackend

val (sched , ts) = SparkContext.createTaskScheduler(this, master) // 這里的master是"spark.master"參數的值,String類型
_schedulerBackend = sched//生成 schedulerBackend
_taskScheduler = ts//生成 taskScheduler
_taskScheduler .start()

 

進入到createtaskscheduler方法中,具體實現根據事master的模型,有yarn-client,yarn-cluster,local等;

我們只看yarn-client模式(平常用的比較多的時候,yarn-client模式的時候,driver在客戶端,那么輸出的日志也會在本地可以查看,yarn-cluster模式下driver是在資源管理器下的,首先日志不太方便查看),可以看到內部實現是根據match case來實現匹配的。yarn-clent模式下,schedulerbackend實現org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend,taskscheduler 實現org.apache.spark.scheduler.cluster.YarnScheduler;

 case "yarn-client" =>
        val scheduler = try {
          val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
          val cons = clazz.getConstructor(classOf[SparkContext])
          cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl]

        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

        val backend = try {
          val clazz =
            Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend")
          val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext])
          cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend]
        } catch {
          case e: Exception => {
            throw new SparkException("YARN mode not available ?", e)
          }
        }

        scheduler.initialize(backend)
        (backend, scheduler)

 

在根據master獲得了實際調度類型之后,並沒有馬上返回,而是調用了scheduler.initizlize(backend)

TaskSchedulerImpl#initialize

在這個方法中,可以看到是根據spark的調度模式初始化一個調度池,這里可以看到spark有兩種調度模式(FIFO,FAIR兩種模式);

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty 這里可以看到調度池初始化最小設置為0
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
      }
    }
    schedulableBuilder.buildPools()
  }

 

完成后會返回實際類型,然后啟動taskscheduler.start()

 

啟動了之后根據DAGscheduler提交的stage的類型,shufflestage還是resultstage去不同的執行:

TaskScheduler#submitTasks

 

未完待續.....

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM