前面我們應知道了一個任務提交會由DAG拆分為job,stage,task,最后提交給TaskScheduler,在提交taskscheduler中會根據master初始化taskscheduler和schedulerbackend兩個類,並且初始化一個調度池;
1.調度池比較
根據mode初始化調度池pool
def initialize(backend: SchedulerBackend) { this.backend = backend // temporarily set rootPool name to empty 這里可以看到調度池初始化最小設置為0 rootPool = new Pool("", schedulingMode, 0, 0) schedulableBuilder = { schedulingMode match { case SchedulingMode.FIFO => new FIFOSchedulableBuilder(rootPool) case SchedulingMode.FAIR => new FairSchedulableBuilder(rootPool, conf) } } schedulableBuilder.buildPools() }
FIFO模式
這個會根據spark.scheduler.mode 來設置FIFO or FAIR,默認的是FIFO模式;
FIFO模式什么都不做,實現默認的schedulerableBUilder方法,建立的調度池也為空,addTasksetmaneger也是調用默認的;
可以簡單的理解為,默認模式FIFO什么也不做。。
FAIR模式
fair模式則重寫了buildpools的方法,讀取默認路徑 $SPARK_HOME/conf/fairscheduler.xml文件,也可以通過參數spark.scheduler.allocation.file
設置用戶自定義配置文件。
文件中配置的是
poolname 線程池名
schedulermode 調度模式(FIFO,FAIR僅有兩種)
minshare 初始大小的線程核數
wight 調度池的權重
override def buildPools() { var is: Option[InputStream] = None try { is = Option { schedulerAllocFile.map { f => new FileInputStream(f) }.getOrElse { Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) } } is.foreach { i => buildFairSchedulerPool(i) } } finally { is.foreach(_.close()) } // finally create "default" pool buildDefaultPool() }
同時也重寫了addtaskmanager方法
override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME var parentPool = rootPool.getSchedulableByName(poolName) if (properties != null) { poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) parentPool = rootPool.getSchedulableByName(poolName) if (parentPool == null) { // we will create a new pool that user has configured in app // instead of being defined in xml file parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) rootPool.addSchedulable(parentPool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) } } parentPool.addSchedulable(manager) logInfo("Added task set " + manager.name + " tasks to pool " + poolName) }
這一段邏輯中是把配置文件中的pool,或者default pool放入rootPool中,然后把TaskSetManager存入rootPool對應的子pool;
2.調度算法比較
除了初始化的調度池不一致外,其實現的調度算法也不一致
實現的調度池Pool,在內部實現方法中也會根據mode不一致來實現調度的不同
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = { schedulingMode match { case SchedulingMode.FAIR => new FairSchedulingAlgorithm() case SchedulingMode.FIFO => new FIFOSchedulingAlgorithm() } }
FIFO模式
FIFO模式的調度方式很容易理解,比較stageID,誰小誰先執行;
這也很好理解,stageID小的任務一般來說是遞歸的最底層,是最先提交給調度池的;
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val priority1 = s1.priority val priority2 = s2.priority var res = math.signum(priority1 - priority2) if (res == 0) { val stageId1 = s1.stageId val stageId2 = s2.stageId res = math.signum(stageId1 - stageId2) } if (res < 0) { true } else { false } } }
FAIR模式
fair模式來說的話,稍微復雜一點;
但是還是比較容易看懂,
1.先比較兩個stage的 runningtask使用的核數,其實也可以理解為task的數量,誰小誰的優先級高;
2.比較兩個stage的 runningtask 權重,誰的權重大誰先執行;
3.如果前面都一直,則比較名字了(字符串比較),誰大誰先執行;
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val minShare1 = s1.minShare val minShare2 = s2.minShare val runningTasks1 = s1.runningTasks val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var compare: Int = 0 if (s1Needy && !s2Needy) { return true } else if (!s1Needy && s2Needy) { return false } else if (s1Needy && s2Needy) { compare = minShareRatio1.compareTo(minShareRatio2) } else { compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) } if (compare < 0) { true } else if (compare > 0) { false } else { s1.name < s2.name } }
總結:雖然了解一下spark的調度模式,以前在執行中基本都沒啥用到,沒想到spark還有這樣的隱藏功能。。。