Spark Executor Driver資源調度小結【轉】


一、引子

  在Worker Actor中,每次LaunchExecutor會創建一個CoarseGrainedExecutorBackend進程,Executor和CoarseGrainedExecutorBackend是1對1的關系。也就是說集群里啟動多少Executor實例就有多少CoarseGrainedExecutorBackend進程。

  那么到底是如何分配Executor的呢?怎么控制調節Executor的個數呢?

 二、Driver和Executor資源調度

   下面主要介紹一下Spark Executor分配策略:

   我們僅看,當Application提交注冊到Master后,Master會返回RegisteredApplication,之后便會調用schedule()這個方法,來分配Driver的資源,和啟動Executor的資源。

schedule()方法是來調度當前可用資源的調度方法,它管理還在排隊等待的Apps資源的分配,這個方法是每次在集群資源發生變動的時候都會調用,根據當前集群最新的資源來進行Apps的資源分配。

Driver資源調度:

  隨機的將Driver分配到空閑的Worker上去,詳細流程請看我寫的注釋 :)

 

[java]  view plain  copy
 
  1. // First schedule drivers, they take strict precedence over applications  
  2. val shuffledWorkers = Random.shuffle(workers) // 把當前workers這個HashSet的順序隨機打亂  
  3. for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { //遍歷活着的workers  
  4.   for (driver <- waitingDrivers) { //在等待隊列中的Driver們會進行資源分配  
  5.     if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { //當前的worker內存和cpu均大於當前driver請求的mem和cpu,則啟動  
  6.       launchDriver(worker, driver) //啟動Driver 內部實現是發送啟動Driver命令給指定Worker,Worker來啟動Driver。  
  7.       waitingDrivers -= driver //把啟動過的Driver從隊列移除  
  8.     }  
  9.   }  
  10. }  

 

 

Executor資源調度:

 Spark默認提供了一種在各個節點進行round-robin的調度,用戶可以自己設置這個flag
[java]  view plain  copy
 
  1. val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)  

在介紹之前我們先介紹一個概念,
可用的Worker:什么是可用,可用就是 資源空閑足夠且滿足一定的 規則來啟動當前App的Executor。
Spark定義了一個canUse方法:這個方法接受一個ApplicationInfo的描述信息和當前Worker的描述信息。
1、 當前worker的空閑內存 比  該app在每個slave要占用的內存 (executor.memory默認512M)大 
2、當前app從未在此worker啟動過App
總結: 從這點看出,要滿足:該Worker的當前可用最小內存要比配置的executor內存大,並且對於同一個App只能在一個Worker里啟動一個Exeutor,如果要啟動第二個Executor,那么請到其它Worker里。這樣的才算是對App可用的Worker。
[java]  view plain  copy
 
  1. /** 
  2.  * Can an app use the given worker? True if the worker has enough memory and we haven't already 
  3.  * launched an executor for the app on it (right now the standalone backend doesn't like having 
  4.  * two executors on the same worker). 
  5.  */  
  6. def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {  
  7.   worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)  
  8. }  

SpreadOut分配策略:

SpreadOut分配策略是一種以round-robin方式遍歷集群所有可用Worker,分配Worker資源,來啟動創建Executor的策略,好處是盡可能的將cores分配到各個節點,最大化負載均衡和高並行。
下面看看,默認的spreadOutApps模式啟動App的過程: 
 
 1、等待分配資源的apps隊列默認是FIFO的。
 2、app.coresLeft表示的是該app還有cpu資源沒申請到:   app.coresLeft  = 當前app申請的maxcpus - granted的cpus
 3、遍歷未分配完全的apps,繼續給它們分配資源,
 4、usableWorkers =  從當前ALIVE的Workers中過濾找出上文描述的可用Worker,然后根據cpus的資源空閑,從大到小給Workers排序。
 5、當toAssign(即將要分配的的core數>0,就找到可以的Worker持續分配)
 6、當可用Worker的free cores 大於 目前該Worker已經分配的core時,再給它分配1個core,這樣分配是很平均的方法。
 7、round-robin輪詢可用的Worker循環
 8、toAssign=0時結束循環,開始根據分配策略去真正的啟動Executor。
 
舉例: 1個APP申請了6個core, 現在有2個Worker可用。
      那么: toAssign = 6,assigned = 2 
 那么就會在assigned(1)和assigned(0)中輪詢平均分配cores,以+1 core的方式,最終每個Worker分到3個core,即每個Worker的啟動一個Executor,每個Executor獲得3個cores。
[java]  view plain  copy
 
  1. // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app  
  2.     // in the queue, then the second app, etc.  
  3.     if (spreadOutApps) {  
  4.       // Try to spread out each app among all the nodes, until it has all its cores  
  5.       for (app <- waitingApps if app.coresLeft > 0) { //對還未被完全分配資源的apps處理  
  6.         val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)  
  7.           .filter(canUse(app, _)).sortBy(_.coresFree).reverse //根據core Free對可用Worker進行降序排序。  
  8.         val numUsable = usableWorkers.length //可用worker的個數 eg:可用5個worker  
  9.         val assigned = new Array[Int](numUsable) //候選Worker,每個Worker一個下標,是一個數組,初始化默認都是0  
  10.         var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//還要分配的cores = 集群中可用Worker的可用cores總和(10), 當前未分配core(5)中找最小的  
  11.         var pos = 0  
  12.         while (toAssign > 0) {   
  13.           if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //以round robin方式在所有可用Worker里判斷當前worker空閑cpu是否大於當前數組已經分配core值  
  14.             toAssign -= 1  
  15.             assigned(pos) += //當前下標pos的Worker分配1個core +1  
  16.           }  
  17.           pos = (pos + 1) % numUsable //round-robin輪詢尋找有資源的Worker  
  18.         }  
  19.         // Now that we've decided how many cores to give on each node, let's actually give them  
  20.         for (pos <- 0 until numUsable) {  
  21.           if (assigned(pos) > 0) { //如果assigned數組中的值>0,將啟動一個executor在,指定下標的機器上。  
  22.             val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //更新app里的Executor信息  
  23.             launchExecutor(usableWorkers(pos), exec)  //通知可用Worker去啟動Executor  
  24.             app.state = ApplicationState.RUNNING  
  25.           }  
  26.         }  
  27.       }  
  28.     } else {  

非SpreadOut分配策略:

非SpreadOut策略,該策略:會盡可能的根據每個Worker的剩余資源來啟動Executor,這樣啟動的Executor可能只在集群的一小部分機器的Worker上。這樣做對node較少的集群還可以,集群規模大了,Executor的並行度和機器負載均衡就不能夠保證了。
 
當用戶設定了參數 spark.deploy.spreadOut 為 false時,觸發此游戲分支 偷笑,跑個題,有些困了。。
1、遍歷可用Workers
2、且遍歷Apps
3、比較當前Worker的可用core和app還需要分配的core,取最小值當做還需要分配的core
4、如果coreToUse大於0,則直接拿可用的core來啟動Executor。。奉獻當前Worker全部資源。(Ps:挨個榨干每個Worker的剩余資源。。。。)
 
舉例: App申請12個core,3個Worker,Worker1剩余1個core, Worke2r剩7個core, Worker3剩余4個core.
這樣會啟動3個Executor,Executor1 占用1個core, Executor2占用7個core, Executor3占用4個core.
總結:這樣是盡可能的滿足App,讓其盡快執行,而忽略了其並行效率和負載均衡。
[java]  view plain  copy
 
  1. else {  
  2.      // Pack each app into as few nodes as possible until we've assigned all its cores  
  3.      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {  
  4.        for (app <- waitingApps if app.coresLeft > 0) {  
  5.          if (canUse(app, worker)) { //直接問當前worker是有空閑的core  
  6.            val coresToUse = math.min(worker.coresFree, app.coresLeft) //有則取,不管多少  
  7.            if (coresToUse > 0) { //有  
  8.              val exec = app.addExecutor(worker, coresToUse) //直接啟動  
  9.              launchExecutor(worker, exec)  
  10.              app.state = ApplicationState.RUNNING  
  11.            }  
  12.          }  
  13.        }  
  14.      }  
  15.    }  
  16.  }  

三、總結:

 1、 在Worker Actor中,每次LaunchExecutor會創建一個CoarseGrainedExecutorBackend進程,一個Executor對應一個CoarseGrainedExecutorBackend

 2、針對同一個App,每個Worker里只能有一個針對該App的Executor存在,切記。如果想讓整個App的Executor變多,設置SPARK_WORKER_INSTANCES,讓Worker變多。

 3、Executor的資源分配有2種策略:

3.1、SpreadOut :一種以round-robin方式遍歷集群所有可用Worker,分配Worker資源,來啟動創建Executor的策略,好處是盡可能的將cores分配到各個節點,最大化負載均衡和高並行。

3.2、非SpreadOut:會盡可能的根據每個Worker的剩余資源來啟動Executor,這樣啟動的Executor可能只在集群的一小部分機器的Worker上。這樣做對node較少的集群還可以,集群規模大了,Executor的並行度和機器負載均衡就不能夠保證了。

 

行文倉促,如有不正之處,請指出,歡迎討論 :)

 

補充:

1、關於:   一個App一個Worker為什么只有允許有針對該App的一個Executor 到底這樣設計為何? 的討論:

連城404:Spark是線程級並行模型,為什么需要一個worker為一個app啟動多個executor呢?

朴動_zju:一個worker對應一個executorbackend是從mesos那一套遷移過來的,mesos下也是一個slave一個executorbackend。我理解這里是可以實現起多個,但起多個貌似沒什么好處,而且增加了復雜度。

CrazyJvm@CodingCat 做了一個patch可以啟動多個,但是還沒有被merge。 從Yarn的角度考慮的話,一個Worker可以對應多個executorbackend,正如一個nodemanager對應多個container。 @OopsOutOfMemory 

OopsOutOfMemory:回復@連城404: 如果一個executor太大且裝的對象太多,會導致GC很慢,多幾個Executor會減少full gc慢的問題。 see this post http://t.cn/RP1bVO4(今天 11:25)

連城404:回復@OopsOutOfMemory:哦,這個考慮是有道理的。一個workaround是單台機器部署多個worker,worker相對來說比較廉價。 

JerryLead:回復@OopsOutOfMemory:看來都還在變化當中,standalone 和 YARN 還是有很多不同,我們暫不下結論 (今天 11:35)

JerryLead:問題開始變得復雜了,是提高線程並行度還是提高進程並行度?我想 Spark 還是優先選擇前者,這樣 task 好管理,而且 broadcast,cache 的效率高些。后者有一些道理,但參數配置會變得更復雜,各有利弊吧 (今天 11:40)

 

未完待續。。。

傳送門:@JerrLead  https://github.com/JerryLead/SparkInternals/blob/master/markdown/1-Overview.md

 

——EOF——

原創文章,轉載請注明出自:http://blog.csdn.net/oopsoom/article/details/38763985


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM