一、引子
在Worker Actor中,每次LaunchExecutor會創建一個CoarseGrainedExecutorBackend進程,Executor和CoarseGrainedExecutorBackend是1對1的關系。也就是說集群里啟動多少Executor實例就有多少CoarseGrainedExecutorBackend進程。
那么到底是如何分配Executor的呢?怎么控制調節Executor的個數呢?
二、Driver和Executor資源調度
下面主要介紹一下Spark Executor分配策略:
我們僅看,當Application提交注冊到Master后,Master會返回RegisteredApplication,之后便會調用schedule()這個方法,來分配Driver的資源,和啟動Executor的資源。
schedule()方法是來調度當前可用資源的調度方法,它管理還在排隊等待的Apps資源的分配,這個方法是每次在集群資源發生變動的時候都會調用,根據當前集群最新的資源來進行Apps的資源分配。
Driver資源調度:
- // First schedule drivers, they take strict precedence over applications
- val shuffledWorkers = Random.shuffle(workers) // 把當前workers這個HashSet的順序隨機打亂
- for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { //遍歷活着的workers
- for (driver <- waitingDrivers) { //在等待隊列中的Driver們會進行資源分配
- if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { //當前的worker內存和cpu均大於當前driver請求的mem和cpu,則啟動
- launchDriver(worker, driver) //啟動Driver 內部實現是發送啟動Driver命令給指定Worker,Worker來啟動Driver。
- waitingDrivers -= driver //把啟動過的Driver從隊列移除
- }
- }
- }
Executor資源調度:
- val spreadOutApps = conf.getBoolean("spark.deploy.spreadOut", true)
在介紹之前我們先介紹一個概念,
- /**
- * Can an app use the given worker? True if the worker has enough memory and we haven't already
- * launched an executor for the app on it (right now the standalone backend doesn't like having
- * two executors on the same worker).
- */
- def canUse(app: ApplicationInfo, worker: WorkerInfo): Boolean = {
- worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
- }
SpreadOut分配策略:
- // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
- // in the queue, then the second app, etc.
- if (spreadOutApps) {
- // Try to spread out each app among all the nodes, until it has all its cores
- for (app <- waitingApps if app.coresLeft > 0) { //對還未被完全分配資源的apps處理
- val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
- .filter(canUse(app, _)).sortBy(_.coresFree).reverse //根據core Free對可用Worker進行降序排序。
- val numUsable = usableWorkers.length //可用worker的個數 eg:可用5個worker
- val assigned = new Array[Int](numUsable) //候選Worker,每個Worker一個下標,是一個數組,初始化默認都是0
- var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)//還要分配的cores = 集群中可用Worker的可用cores總和(10), 當前未分配core(5)中找最小的
- var pos = 0
- while (toAssign > 0) {
- if (usableWorkers(pos).coresFree - assigned(pos) > 0) { //以round robin方式在所有可用Worker里判斷當前worker空閑cpu是否大於當前數組已經分配core值
- toAssign -= 1
- assigned(pos) += 1 //當前下標pos的Worker分配1個core +1
- }
- pos = (pos + 1) % numUsable //round-robin輪詢尋找有資源的Worker
- }
- // Now that we've decided how many cores to give on each node, let's actually give them
- for (pos <- 0 until numUsable) {
- if (assigned(pos) > 0) { //如果assigned數組中的值>0,將啟動一個executor在,指定下標的機器上。
- val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) //更新app里的Executor信息
- launchExecutor(usableWorkers(pos), exec) //通知可用Worker去啟動Executor
- app.state = ApplicationState.RUNNING
- }
- }
- }
- } else {
非SpreadOut分配策略:

- } else {
- // Pack each app into as few nodes as possible until we've assigned all its cores
- for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
- for (app <- waitingApps if app.coresLeft > 0) {
- if (canUse(app, worker)) { //直接問當前worker是有空閑的core
- val coresToUse = math.min(worker.coresFree, app.coresLeft) //有則取,不管多少
- if (coresToUse > 0) { //有
- val exec = app.addExecutor(worker, coresToUse) //直接啟動
- launchExecutor(worker, exec)
- app.state = ApplicationState.RUNNING
- }
- }
- }
- }
- }
- }
三、總結:
2、針對同一個App,每個Worker里只能有一個針對該App的Executor存在,切記。如果想讓整個App的Executor變多,設置SPARK_WORKER_INSTANCES,讓Worker變多。
3、Executor的資源分配有2種策略:
3.1、SpreadOut :一種以round-robin方式遍歷集群所有可用Worker,分配Worker資源,來啟動創建Executor的策略,好處是盡可能的將cores分配到各個節點,最大化負載均衡和高並行。
3.2、非SpreadOut:會盡可能的根據每個Worker的剩余資源來啟動Executor,這樣啟動的Executor可能只在集群的一小部分機器的Worker上。這樣做對node較少的集群還可以,集群規模大了,Executor的並行度和機器負載均衡就不能夠保證了。
行文倉促,如有不正之處,請指出,歡迎討論 :)
補充:
1、關於: 一個App一個Worker為什么只有允許有針對該App的一個Executor 到底這樣設計為何? 的討論:
連城404:Spark是線程級並行模型,為什么需要一個worker為一個app啟動多個executor呢?
朴動_zju:一個worker對應一個executorbackend是從mesos那一套遷移過來的,mesos下也是一個slave一個executorbackend。我理解這里是可以實現起多個,但起多個貌似沒什么好處,而且增加了復雜度。
CrazyJvm:@CodingCat 做了一個patch可以啟動多個,但是還沒有被merge。 從Yarn的角度考慮的話,一個Worker可以對應多個executorbackend,正如一個nodemanager對應多個container。 @OopsOutOfMemory
OopsOutOfMemory:回復@連城404: 如果一個executor太大且裝的對象太多,會導致GC很慢,多幾個Executor會減少full gc慢的問題。 see this post http://t.cn/RP1bVO4(今天 11:25)
連城404:回復@OopsOutOfMemory:哦,這個考慮是有道理的。一個workaround是單台機器部署多個worker,worker相對來說比較廉價。
JerryLead:回復@OopsOutOfMemory:看來都還在變化當中,standalone 和 YARN 還是有很多不同,我們暫不下結論 (今天 11:35)
JerryLead:問題開始變得復雜了,是提高線程並行度還是提高進程並行度?我想 Spark 還是優先選擇前者,這樣 task 好管理,而且 broadcast,cache 的效率高些。后者有一些道理,但參數配置會變得更復雜,各有利弊吧 (今天 11:40)
未完待續。。。
傳送門:@JerrLead https://github.com/JerryLead/SparkInternals/blob/master/markdown/1-Overview.md
——EOF——
原創文章,轉載請注明出自:http://blog.csdn.net/oopsoom/article/details/38763985