上次分析了dagshceduler是如何將任務拆分成job,stage,task的,但是拆分后的僅僅是一個邏輯結果,保存為一個resultstage對象,並沒執行;
而將任務正在執行的是spark的taskscheduler模塊和shcedulerbackend模塊,
taskcheduler模塊負責task的調度,schedulerbackend負責task的自願申請,這兩個結合比價緊密,實現也是在一起實現的;
點開sparkcontext的內部屬性,可以看到taskscheduler的的對象(org.apache.spark.scheduler.TaskScheduler)是一個trait(Scala的叫法,簡單的理解為類似於java的interface),這是因為task的提交方式有多種,可以是yarn-client模式,也可以是yarn-cluster模型,這取決於提交spark提交時候設置的參數master。
master設置不同,最終實現的也不同,當是yarn-client模式的時候,task實現方式則是yarnscheduler。
同樣的schedulerbackend也是一個trait,具體的實現也是根據spark.master來決定,如果是yarn-client模式,實現則是yarnclientschedulerbackend。
具體看一下代碼實現:
SparkContext#createTaskScheduler
sparkcontext中調用createtaskscheduler,根據master來決定生成的實際類型,taskscheduler,schedulerbackend
val (sched , ts) = SparkContext.createTaskScheduler(this, master) // 這里的master是"spark.master"參數的值,String類型 _schedulerBackend = sched//生成 schedulerBackend _taskScheduler = ts//生成 taskScheduler _taskScheduler .start()
進入到createtaskscheduler方法中,具體實現根據事master的模型,有yarn-client,yarn-cluster,local等;
我們只看yarn-client模式(平常用的比較多的時候,yarn-client模式的時候,driver在客戶端,那么輸出的日志也會在本地可以查看,yarn-cluster模式下driver是在資源管理器下的,首先日志不太方便查看),可以看到內部實現是根據match case來實現匹配的。yarn-clent模式下,schedulerbackend實現org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend,taskscheduler 實現org.apache.spark.scheduler.cluster.YarnScheduler;
case "yarn-client" => val scheduler = try { val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler") val cons = clazz.getConstructor(classOf[SparkContext]) cons.newInstance(sc).asInstanceOf[TaskSchedulerImpl] } catch { case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } val backend = try { val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend") val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] } catch { case e: Exception => { throw new SparkException("YARN mode not available ?", e) } } scheduler.initialize(backend) (backend, scheduler)
在根據master獲得了實際調度類型之后,並沒有馬上返回,而是調用了scheduler.initizlize(backend)
TaskSchedulerImpl#initialize
在這個方法中,可以看到是根據spark的調度模式初始化一個調度池,這里可以看到spark有兩種調度模式(FIFO,FAIR兩種模式);
def initialize(backend: SchedulerBackend) { this.backend = backend // temporarily set rootPool name to empty 這里可以看到調度池初始化最小設置為0 rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) } } schedulableBuilder.buildPools() }
完成后會返回實際類型,然后啟動taskscheduler.start()
啟動了之后根據DAGscheduler提交的stage的類型,shufflestage還是resultstage去不同的執行:
TaskScheduler#submitTasks
未完待續.....