spark 作業調度


一、調度分類

調度分為兩種,一是應用之間的,二是應用內部作業的。

(一)應用之間

我們前面幾章有說過,一個spark-submit提交的是一個應用,不同的應用之間是有調度的,這個就由資源分配者來調度。如果我們使用Yarn,那么就由Yarn來調度。調度方式的配置就在$HADOOP_HOME/etc/hadoop/yarn-site.xml中

 

[html]  view plain  copy
 
  1. <property>  
  2. <name>yarn.resourcemanager.scheduler.class</name>   
  3. <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>  
  4. </property>  

 

 

(二)應用內部

參考《Spark基礎入門(三)--------作業執行方式》可以看到,SparkContext底層會觸發調用runJob的方法阻塞式的提交job,提交job的線程會處於阻塞狀態,同一個線程中,后面的job需要等待前面job完成才能提交。但當多線程執行時,則可以並發提交Job。

例如SparkStreaming運行並發提交時,可以看到一個SparkStreaming的項目中多個job在同時跑:

 

再例如Thriftserver,多個用戶通過beeline連接Thriftserver提交自己的查詢,所有的查詢都是並行運行的:

 

我們重點介紹應用內部的調度,調度方式的配置在

$SPAKR_HOME/conf/spark-defaults.conf

 

[html]  view plain  copy
 
  1. spark.scheduler.mode = FIFO/FAIR  



 

二、調度原理

結合《Spark基礎入門(三)--------作業執行方式》

 

(一)作業提交與調度池的創建

 

 

 

1. DAGScheduler采取的生產者消費者模型,存在一個Event隊列,用戶和TaskScheduler會生產event到這個隊列中,DAGScheduler中會有一個Daemon線程去消費這些event並產生對應的處理。DAGScheduler可以處理的Event包括:JobSubmitted、CompletionEvent、ExecutorLost、TaskFailed、StopDAGScheduler。

2. DAGScheduler 在接收到JobSubmitted的Event之后,會首先計算出其DAG圖,然后划分Stage,最后提交TaskSet到TaskScheduler(通過調用TaskScheduler的submitTasks,TaskScheduler還有cancelTasks的方法)

3. TaskScheduler的submitTasks方法最后會創建TaskManager的實例,由它去管理里面的TaskSet。

4. SparkContext是多線程安全的,可以有多個線程提交Job,這個Job也就是sparkAction

5. 每個線程提交Job時,是按Stage為最小單位來提交的,提交一個stage的TaskSet(一堆task任務)有一個TaskSetManager會來管理TaskSet,一個TaskSet對應一個TaskSetManager

6. TaskScheduler在初始化時,會創建一個Pool,用於調度;還會創建SchedulerBuilder,會去構造剛剛這個Pool。

7. SchedulerBuilder在TaskSchedulerImpl類中的定義如下,SchedulerBuilder會根據用戶設定的調度模式(比如FIFO或者Fair)調用其buildPools方法,將下面的TaskSetManager按照一定的組織形式放到Pool中。上圖綠色框圖圈出來的部分。比如使用的FIFO,則以FIFOSchedulableBuilder類來build pool,如果使用FAIR,則使用FairChedulableBuilder

 

[java]  view plain  copy
 
  1. var schedulableBuilder: SchedulableBuilder = null  
  2. ...  
  3.   def initialize(backend: SchedulerBackend) {  
  4.     this.backend = backend  
  5.     // temporarily set rootPool name to empty  
  6.     rootPool = new Pool("", schedulingMode, 0, 0)  
  7. schedulableBuilder = {  
  8.   schedulingMode match {  
  9.     case SchedulingMode.FIFO =>  
  10.       //rootPool包含了一組TaskSetManager  
  11.       new FIFOSchedulableBuilder(rootPool)  
  12.         case SchedulingMode.FAIR =>  
  13.           //rootPool包含了一組Pool樹,這棵樹的葉子節點都是TaskSetManager  
  14.           new FairSchedulableBuilder(rootPool, conf)  
  15.         }  
  16.      }  
  17.      schedulableBuilder.buildPools() //在FIFO中的實現是空  
  18. }  

 

(二)作業調度

 

 

上述都是基礎工作,pool和調度對象建立聯系之后,才開始真正的調度。

1. 調度由TaskScheduler進行,只有在有新的計算資源時才會進行作業調度。

2. TaskScheduler后面還有SchedulerBackend,SchedulerBackend會負責與Executor交互。

3. SchedulerBackend會調用makeOffers,觸發TaskScheduler調用resourceOffers方法。resourceOffers方法會根據當前的設置,選用一個調度算法,進行作業調度。

 

[java]  view plain  copy
 
  1. var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {  
  2.   schedulingMode match {  
  3. case SchedulingMode.FAIR =>  
  4.   new FairSchedulingAlgorithm()  
  5. case SchedulingMode.FIFO =>  
  6.   new FIFOSchedulingAlgorithm()  
  7. }  
  8. }  

 

4. 有兩種觸發SchedulerBackend調用makeOffers的場景:

(1) 定時任務:SchedulerBackend在啟動時會創建DriverEndPoint,DriverEndPoint中有定時任務,一定時間(spark.scheduler.revive.interval,默認為1s)進行一次調度(給自身發送ReviveOffers消息, 進行調用makeOffers進行調度)

(2)資源釋放:當Executor執行完成已分配任務時,會向Driver發送StatusUpdate消息,表明一個Executor資源已經釋放,則調用makeOffers(executorId)方法。

 

三、調度算法

 

(一)FIFO(First in first out)

三個線程提交三個Job,則按照順序,先執行Job1,執行結束之后再執行Job2,然后再執行Job3。

1. buildPools算法

對於FIFO模式的調度,rootPool管理的直接就是TaskSetManager。SchedulerBuilder的buildPools方法會遍歷所有的TaskSetManager,然后將他們直接掛在rootPool下面。

FIFO調度模式只有一層,會在葉子節點TaskSetManager中選擇調度哪一個

[java]  view plain  copy
 
  1. /**FIFO模式下的Pools的構建/   
  2. private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)   
  3. extends SchedulableBuilder with Logging {   
  4. override def buildPools() {   
  5. // 實際什么都不做   
  6. }   
  7. //添加下級調度實體的時候,直接添加到rootPool   
  8. override def addTaskSetManager(manager: Schedulable, properties: Properties) {   
  9. rootPool.addSchedulable(manager)   
  10. }   
  11. }  

2. 調度算法

 

[java]  view plain  copy
 
  1. /** 
  2.  * FIFO排序的實現,主要因素是優先級、其次是對應的Stage 
  3.  * 優先級高的在前面,優先級相同,則靠前的stage優先 
  4.  */  
  5. private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {  
  6.     override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {  
  7.     //優先級越小優先級越高  
  8.     val priority1 = s1.priority  
  9.     val priority2 = s2.priority  
  10.     var res = math.signum(priority1 - priority2)  
  11.     if (res == 0) {  
  12.         //如果優先級相同,那么Stage靠前的優先  
  13.         val stageId1 = s1.stageId  
  14.         val stageId2 = s2.stageId  
  15.         res = math.signum(stageId1 - stageId2)  
  16.     }  
  17.     if (res < 0) {  
  18.         true  
  19.     } else {  
  20.        false  
  21.     }  
  22.  }  
  23. }  

 

首先比較優先級

如果優先級相同,就比較Stage的大小。

在FIFO中,優先級即是JobID。而JobID是順序生成的,所以也就是先生成的JobID比較小,參考代碼可以看到優先級(JobID)越小,越先調度。

對同一個作業(Job)來說越先生成的Stage,其StageId越小,

有依賴關系的多個Stage之間,DAGScheduler會控制Stage是否會被提交到調度隊列中(若其依賴的Stage未執行完前,此Stage不會被提交),其調度順序可通過此來保證。但若某Job中有兩個無入度的Stage的話,則先調度StageId小的Stage。

3. 實驗

 

這個算法對外表現出來就是一個Job1先執行完了之后下一個Job2,那么如果Job1運行需要3個小時,而Job2運行只需要1分鍾,結果Job2從提交到結束會需要3小時一分鍾。非常不友好、不靈活。

(二)FAIR

首先配置$SPAKR_HOME/conf/spark-defaults.conf

 

[html]  view plain  copy
 
  1. spark.scheduler.mode    =  FAIR  

1. buildPools算法

 

[java]  view plain  copy
 
  1. /**FAIR模式下的Pools的構建*/  
  2. private[spark] class FairSchedulableBuilder(val rootPool: Pool, conf: SparkConf)  
  3.     extends SchedulableBuilder with Logging {  
  4.     ....省略代碼...  
  5.      override def buildPools() {  
  6.          ...省略...  
  7.         buildDefaultPool()  
  8.      }  
  9.    
  10.      private def buildDefaultPool() {  
  11.         if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {  
  12.         val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,  
  13.             DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)  
  14.         rootPool.addSchedulable(pool)  
  15.         ......  
  16.  }  
  17. }  

 

 

 

模型如上,Fair模型的調度是兩級調度。rootPool下面管理的是其他pool,下面的pool才去管理TaskManager。

配置方式:

1)添加池子

 

添加$SPAKR_HOME/conf/fairscheduler.xml可以設置調度的多個池子,如果不設置,則默認底下只有一個defaultPool池子。

 

[html]  view plain  copy
 
  1. <?xml version="1.0"?>  
  2. <allocations>  
  3.   <pool name="default">  
  4.     <schedulingMode>FAIR</schedulingMode>  
  5.     <weight>1</weight>  
  6.     <minShare>0</minShare>  
  7.   </pool>  
  8.   <pool name="pool1">  
  9.     <schedulingMode>FAIR</schedulingMode>  
  10.     <weight>1</weight>  
  11.     <minShare>0</minShare>  
  12.   </pool>  
  13. </allocations>  

2)配置TaskSetManager與池子之間的關系

 

線程1提交了一個action,這個action觸發了一個jobId為1的job,會交給TaskSetManager1來管理。

在提交這個action之前,代碼中設置spark.scheduler.pool:

SparkContext.setLocalProperty(“spark.scheduler.pool”,”pool_name_1”)

注意這里的setLocalProperty,筆者認為應該是線程私有的對象。

如果不加設置,jobs會提交到default調度池中。由於調度池的使用是Thread級別的,只能通過具體的SparkContext來設置local屬性(即無法在配置文件中通過參數spark.scheduler.pool來設置,因為配置文件中的參數會被加載到SparkConf對象中)。所以需要使用指定調度池的話,需要在具體代碼中通過SparkContext對象sc來按照如下方法進行設置: 
sc.setLocalProperty("spark.scheduler.pool", "test") 
設置該參數后,在該thread中提交的所有job都會提交到test Pool中。 
如果接下來不再需要使用到該test調度池, 
sc.setLocalProperty("spark.scheduler.pool", null)

 

我們將不同線程提交的job給隔離到不同的池子里了

 

2. 調度算法

 

[java]  view plain  copy
 
  1. private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {  
  2.     override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {  
  3.     //最小共享,可以理解為執行需要的最小資源即CPU核數  
  4.     val minShare1 = s1.minShare  
  5.     val minShare2 = s2.minShare  
  6.     //運行的任務的數量  
  7.     val runningTasks1 = s1.runningTasks  
  8.     val runningTasks2 = s2.runningTasks  
  9. //運行中的任務的數量與最小CPU核數比較,如果小於,則說明處於飢餓狀態  
  10.     val s1Needy = runningTasks1 < minShare1  
  11.     val s2Needy = runningTasks2 < minShare2  
  12.    
  13.      //飢餓程度越大(runningTask遠小於minshare),算出來的數值越小  
  14.     val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0).toDouble  
  15.     val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble  
  16.    
  17.      //權重越高,算出來的數值越小  
  18.     val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble  
  19.     val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble  
  20.     var compare: Int = 0  
  21.    
  22.      //飢餓的優先  
  23.     if (s1Needy && !s2Needy) {  
  24.         return true  
  25.     } else if (!s1Needy && s2Needy) {  
  26.         return false  
  27.     } else if (s1Needy && s2Needy) {  
  28.         //都處於挨餓狀態則飢餓程度越大的優先  
  29.         compare = minShareRatio1.compareTo(minShareRatio2)  
  30.     } else {  
  31.         //都不挨餓,則權重高的優先  
  32.         compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)  
  33.    }  
  34.    
  35.   if (compare < 0) {//小於0時,返回true  
  36.         true  
  37.    } else if (compare > 0) {//大於0時,返回false  
  38.     false  
  39.    } else {  
  40.   //如果都一樣,那么比較名字,按照字母順序比較,不考慮長度,所以名字比較重要  
  41.     s1.name < s2.name  
  42.   }  
  43.  }  
  44. }  

上述算法總結下來就是:

1.飢餓的優先(minShare)

2.都處於挨餓狀態則飢餓程度越大的優先(running/minShare越小的優先)

3.都不挨餓,則權重程度高的優先(running/weight越小的優先)

4.算出來的值相同時,則比較名字(按照字母順序比較)

 

3. 案例分析

20核分配

三個池子hello(minshare:5/weight:15), apple(minshare:2/weight:5), pool(minshare:3/weight:1)

初始狀態:0<5                      0<2                      0<3

全部飢餓

飢餓程度  0%                       0%                       0%

按名字分配                          1

飢餓程度 0%                        1/2(50%)                  0%

按名字                                                      1

飢餓程度 0%                        1/2(50%)                  1/3(33.3%)

按飢餓程度1

飢餓程度  1/5(20%)                  1/2(50%)                  1/3(33.3%)

按飢餓程度1

飢餓程度  2/5(40%)                  1/2(50%)                  1/3(33.3%)

按飢餓程度2/5(40%)                  1/2(50%)                  2/3(66.7%)

按飢餓程度3/5(60%)                  1/2(50%)                  2/3(66.7%)

按飢餓程度3/5(60%)                  2/2(100%)                  2/3(66.7%)

按飢餓程度4/5(80%)                  2/2(100%)                  2/3(66.7%)

按飢餓程度4/5(80%)                  2/2(100%)                  3/3(100%)

按飢餓     5/5(100%)                 2/2(100%)                  3/3(100%)

此時已經分配10個核

全部不飢餓,權重程度 5/15(33.3%)     2/5(40%)                     3/1(300%)

按權重程度6/15(40%)                 2/5(40%)                     3/1(300%)

按名字     6/15(40%)                 3/5(60%)                     3/1(300%)

按權重程度 7/15(46.7%)                3/5(60%)                     3/1(300%)

按權重程度 8/15(53.3%)                3/5(60%)                     3/1(300%)

按權重程度 9/15(60%)                 3/5(60%)                     3/1(300%)

按名字     9/15(60%)                 4/5(80%)                     3/1(300%)

按權重程度 10/15(66.7%)               4/5(80%)                     3/1(300%)

按權重程度 11/15(73.3%)               4/5(80%)                     3/1(300%)

按權重程度 12/15(80%)                4/5(80%)                     3/1(300%)

按名字     12/15(80%)                5/5(100%)                     3/1(300%)

此時20核全部分配完

如果用戶配置一個指定調度池權重為2, 那么這個調度池將會獲得相對於權重為1的調度池2倍的資源

4. 池子內部的調度

第一小層是Pool(資源池)間的公平調度,第二小層是Pool內的。注意,Pool內部調度默認是FIFO的,需要設置{spark_base_dir}/conf/fairscheduler.xml,針對具體的Pool設置調度規則

<pool name="default">

    <schedulingMode>FAIR</schedulingMode>

    <weight>1</weight>

    <minShare>0</minShare>

</pool>

但pool內已經沒有minShare、weight了,所以筆者猜測pool內minShare全是0,weight全是1。然后也就是公平的平均分配所有的資源。

 

四、TaskSetManager內部調度

當資源已經分配給一個taskSetManager之后,再就是執行任務內部的調度邏輯。因為分配的資源是某個executor上的,每個Task又有自己prefer的節點(為了計算的本地性),他們之間可能不是完全的匹配。

例如資源executor(機器c上的)調度給了一個taskSetManager,而taskSetManager中此時只有a,b兩個task(它們prefer的節點是a,b),那么如果此時將c資源給a task,那么a可能計算就是rack(機架中的),然后很短時間內,又有一個a資源調度過來,而此時只能把它給b task。而實際上最佳的方式應該是把a資源給a task,c資源給b task。

所以這里有一個等待機制,包括以下參數:spark.locality.wait.process、spark.locality.wait.node、spark.locality.wait.rack。TaskSetManager會根據等待時間降低自己的要求。(從process本地進程---->node本地節點---->rack同機架上---->any任意匹配)。這種等待機制會帶來一定延遲,但如果這種調整有效那么也會節約很多計算時間(比如上例中,最后a上計算a task會比c上計算a task快很多)。

 

五、Thriftserver的調度

想要thriftserver達到SQL級別的公平調度,需要配置三個配置文件:yarn-site.xml、spark-defaults.conf、fairscheduler.xml。由於thriftserver的SQL沒有按照不同用戶區分多個Pool,所以其實並不能實現不同權重和minshare的設置,只能達到完全公平的分配(也就是(三)4)中提到的池子內的調度)。

 

但通過修改thriftserver的源碼,可以實現不同sql分配到不同的池子里面,就可以實現sql級別的調度了。但池子必須提前配置好。

 

轉載:https://blog.csdn.net/silviakafka/article/details/70735221


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM