在一個application內部,不同線程提交的Job默認按照FIFO順序來執行,假設線程1先提交了一個job1,線程2后提交了一個job2,那么默認情況下,job2必須等待job1執行完畢后才能執行,如果job1是一個長作業,而job2是一個短作業,那么這對於提交job2的那個線程的用戶來說很不友好:我這個job是一個短作業,怎么執行了這么長時間。
使用spark的公平調度算法可以在一定程度上解決這個問題,此時,job2不必等待job1完全運行完畢之后就可以獲得集群資源來執行,最終的效果的就是,job2可能會在job1之前運行完畢。這對於一個更強調對資源的公平競爭的多用戶場景下是非常有用的,每一個用戶都可以獲得等量的資源,當然你可以為每一個用戶指定一個優先級/權重,優先級/權重越高,獲得的資源越多,比如對於一個長作業,你可以為指定更高的權重,而對於短作業,指定一個相對較低的權重。
沒有顯示配置fairScheduler.xml下的公平調度算法
假設線程1提交了一個action,這個action觸發了一個jobId為1的job。同時,在提交這個action之前,設置了spark.scheduler.pool:
SparkContext.setLocalProperty(“spark.scheduler.pool”,”pool_name_1”)
假設線程2提交了一個action,這個action觸發了一個jobId為2的job。同時,在提交這個action之前,也設置了spark.scheduler.pool:
SparkContext.setLocalProperty(“spark.scheduler.pool”,”pool_name_1”)
假設線程3提交了一個action,這個action觸發了一個JobId=3的job,但是這個線程並沒有設置spark.scheduler.pool屬性。
最后的spark 資源池邏輯上如下圖所示:
rootPool這個池子里面有三個小池子,其名字分別為:pool_name1,pool_name2,default;pool_name1這個池子存儲線程1提交的job,pool_name2存儲線程2提交的job,default池子存儲那些沒有顯示設置spark.scheduler.pool的線程提交的job,換句話說我們將不同線程提交的job給隔離到不同的池子里了。
每一個小池子都有以下三個可以配置的屬性:weight,minshare,mode,他們的默認值如下:
weight=1
minshare=0
mode=FIFO
一個池子的weight值越大,其獲得資源就越多,在上圖中,因為這三個池子的weight值相同,所以他們將獲得等量的資源。
一個池子的minShare表示這個池子至少獲得的core個數。
mode可以是FIFO或者FAIR,如果為FIFO,那么池子里jobid越大的job(等價的,先提交的job),將越先獲得集群資源;如果是FAIR,那么將采用一種更加公平的機制來調度job,這個后面再說。
顯示配置fairScheduler.xml的公平調度算法
可以發現,上面那種通過在線程里設置spark.scheduler.pool的方式,所創建的池子的屬性采用的都是默認值,而且一旦創建好之后你就不能再修改他們。spark提供了另外一種創建池子的方式,你可以配置conf/fairScheduler.xml文件,假設其內容如下(官方提供的內容):
<?xml version="1.0"?> <allocations> <pool name="production"> <schedulingMode>FAIR</schedulingMode> <weight>1</weight> <minShare>2</minShare> </pool> <pool name="test"> <schedulingMode>FIFO</schedulingMode> <weight>2</weight> <minShare>3</minShare> </pool> </allocations>
如果配置了fairScheduler.xml文件,並且其內容如上所示,那么此時的spark 資源池的樣子大致如下:
這個資源池里面同樣有三個小池子,其名字分別為:production,test,default。其中production資源池的weight為2,他將獲得更多的資源(與default池子相比),由於其minShare=3,所以他最低將獲得3個core,其mode=FAIR,所以提交到這個池子里的job將按照FAIR算法來調度。
事實上,通過這兩個圖已經能夠在腦海里對spark資源池產生一個大致的印象了,此時再去看spark 資源池的源碼就會非常容易。
在初次閱讀FIFO算法源碼之前:需要重點關注兩個屬性,priority和stageId,其中的priority就是jobid。先提交的job,其jobid越小,因此priority就越小。finalStage其stageId最大,其parent stage 的stageId較小。
對於公平調度算法,給定兩個池子a和b,誰優先獲得資源?
1.如果a阻塞了但是b沒有阻塞,那么先執行a
2.如果a沒有阻塞,b阻塞了,先執行b
3.如果a和b都阻塞了,那么阻塞程度高(等待執行的task比例大) 的那個先執行
4.如果a和b都沒有阻塞,那么資源少的那個先執行。
5.如果以上條件都不滿足,那么按照a和b的名字來排序。
還是看一下這個算法的實現吧:
private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { override def comparator(s1: Schedulable, s2: Schedulable): Boolean = { val minShare1 = s1.minShare val minShare2 = s2.minShare val runningTasks1 = s1.runningTasks val runningTasks2 = s2.runningTasks val s1Needy = runningTasks1 < minShare1 val s2Needy = runningTasks2 < minShare2 val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble var compare: Int = 0 //如果s1阻塞,s2沒有阻塞,那么就先執行s1 if (s1Needy && !s2Needy) { return true } else if (!s1Needy && s2Needy) { return false //如果s1沒有,s2阻塞了,就先執行s2 } else if (s1Needy && s2Needy) {//如果二者都阻塞了,那就看誰阻塞程度大 compare = minShareRatio1.compareTo(minShareRatio2) } else {//都沒阻塞,那么看誰的資源少。 compare = taskToWeightRatio1.compareTo(taskToWeightRatio2) } if (compare < 0) { true } else if (compare > 0) { false } else { s1.name < s2.name//實在不行了,按照池子的name排序吧。 } } }
源碼走讀
TaskSchedulerImpl在收到DAGScheduler提交的TaskSet時執行如下方法:
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") this.synchronized { //創建TaskSetManager val manager = createTaskSetManager(taskSet, maxTaskFailures) val stage = taskSet.stageId val stageTaskSets = taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager]) stageTaskSets(taskSet.stageAttemptId) = manager val conflictingTaskSet = stageTaskSets.exists { case (_, ts) => ts.taskSet != taskSet && !ts.isZombie } if (conflictingTaskSet) { throw new IllegalStateException(s"more than one active taskSet for stage $stage:" + s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}") } //將TaskSetManager添加到資源池,properties里面存儲了我們調用SparkContext.setLocalProperty時傳遞的poolName schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) if (!isLocal && !hasReceivedTask) { starvationTimer.scheduleAtFixedRate(new TimerTask() { override def run() { if (!hasLaunchedTask) { logWarning("Initial job has not accepted any resources; " + "check your cluster UI to ensure that workers are registered " + "and have sufficient resources") } else { this.cancel() } } }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS) } hasReceivedTask = true } //調用CoarseGrainedSchedulerBakckend的reviveOffers() backend.reviveOffers() }
這個方法主要做兩件事情:第一創建TaskSetManager然后將其添加到資源池,第二調用CoarseGrainedSchedulerBackend進行資源調度。
我們重點講解資源池的構造和資源池的添加,因此重點關注schedulerBuilder。他是一個trait,主要有兩個實現:FIFOSchedulableBuilder和FairSchedulableBuilder。
/** * An interface to build Schedulable tree * buildPools: build the tree nodes(pools) * addTaskSetManager: build the leaf nodes(TaskSetManagers) */ private[spark] trait SchedulableBuilder { def rootPool: Pool //構建資源池,在創建SchedulerBuilder時,會調用buildPools方法來構建池子 def buildPools() //將資源池添加到rootPool中 def addTaskSetManager(manager: Schedulable, properties: Properties) }
這里重點關注他的FairSchedulerBuilder。
addTaskSetManager
override def addTaskSetManager(manager: Schedulable, properties: Properties) { var poolName = DEFAULT_POOL_NAME // var parentPool = rootPool.getSchedulableByName(poolName) //如果用戶設置了spark.scheduler.pool if (properties != null) { //默認用戶設置的spark.scheduler.pool的值 poolName = properties.getProperty(FAIR_SCHEDULER_PROPERTIES, DEFAULT_POOL_NAME) parentPool = rootPool.getSchedulableByName(poolName) //如果沒有這個池子,就創建一個新的池子 if (parentPool == null) { // we will create a new pool that user has configured in app // instead of being defined in xml file //此時的mode,minshare,weight都采用默認值,因此可以發現,在通過設置spark.scheduler.pool這種方式生成的池子 //采用的都是默認值 parentPool = new Pool(poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT) //最后將新創建的池子添加到rootPool中。 rootPool.addSchedulable(parentPool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)) } } parentPool.addSchedulable(manager) logInfo("Added task set " + manager.name + " tasks to pool " + poolName) }
從這里可以看出,通過在線程里使用SparkContext.setLocalProperty來設置spark.scheduler.pool所生成的資源池,其weight,minShare,mode采用的都是默認值,這在某些場景可能不滿足用戶要求,此時就需要顯示的配置fairScheduler.xml文件了。
如果用戶創建了fairScheduler.xml,那么會調用buildPools讀取這個文件,來創建用戶配置的池子:
override def buildPools() { var is: Option[InputStream] = None try { is = Option { schedulerAllocFile.map { f => new FileInputStream(f) }.getOrElse { Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE) } } //構建fairScheduler.xml中指定的池子 is.foreach { i => buildFairSchedulerPool(i) } } finally { is.foreach(_.close()) } // finally create "default" pool //構建默認池子,也就是default池子。 buildDefaultPool() }
private def buildFairSchedulerPool(is: InputStream) { val xml = XML.load(is) for (poolNode <- (xml \\ POOLS_PROPERTY)) { val poolName = (poolNode \ POOL_NAME_PROPERTY).text var schedulingMode = DEFAULT_SCHEDULING_MODE var minShare = DEFAULT_MINIMUM_SHARE var weight = DEFAULT_WEIGHT val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text if (xmlSchedulingMode != "") { try { schedulingMode = SchedulingMode.withName(xmlSchedulingMode) } catch { case e: NoSuchElementException => logWarning("Error xml schedulingMode, using default schedulingMode") } } val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text if (xmlMinShare != "") { minShare = xmlMinShare.toInt } val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text if (xmlWeight != "") { weight = xmlWeight.toInt } //創建用戶配置的池子 val pool = new Pool(poolName, schedulingMode, minShare, weight) rootPool.addSchedulable(pool) logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format( poolName, schedulingMode, minShare, weight)) } }
總結:
如果開啟了fari公平調度算法,並且在提交action的線程里面設置了sparkContext.setLocalPropery("spark.scheduler.pool",poolname),那么這個線程提交的所有job都被提交到poolName指定的資源池里,如果poolName指定的資源池不存在,那么將使用默認值來自動創建他。一種更加靈活的創建池子的方式是用戶顯示的配置fairScheduler.xml文件,你可以顯示的設置池子的weight,minShare,mode值。
由於本人接觸spark時間不長,如有錯誤或者任何意見可以在留言或者發送郵件到franciswbs@163.com,讓我們一起交流。
作者:FrancisWang
郵箱:franciswbs@163.com
出處:http://www.cnblogs.com/francisYoung/
本文地址:http://www.cnblogs.com/francisYoung/p/5209798.html
本文版權歸作者和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。