spark-3.0 application 調度算法解析


spark 各個版本的application 調度算法還是有這明顯的不同之處的。從spark1.3.0 到 spark 1.6.1、spark2.0 到 現在最新的spark 3.0 ,調度算法有了一定的修改。下面大家一起學習一下,最新的spark 版本spark-3.0的Application 調度機制。

private def startExecutorsOnWorkers(): Unit = {
// Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
// in the queue, then the second app, etc.
for (app <- waitingApps) {
//如果在 spark-submmit 腳本中,指定了每個executor 多少個 CPU core,
// 則每個Executor 分配該個數的 core,
// 否則 默認每個executor 只分配 1 個 CPU core
val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1)
// If the cores left is less than the coresPerExecutor,the cores left will not be allocated
// 當前 APP 還需要分配的 core 數 不能 小於 單個 executor 啟動 的 CPU core 數
if (app.coresLeft >= coresPerExecutor) {
// Filter out workers that don't have enough resources to launch an executo/*ku*/r
// 過濾出 狀態 為 ALIVE,並且還能 發布 Executor 的 worker
// 按照剩余的 CPU core 數 倒序
val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
.filter(canLaunchExecutor(_, app.desc))
.sortBy(_.coresFree).reverse
if (waitingApps.length == 1 && usableWorkers.isEmpty) {
logWarning(s"App ${app.id} requires more resource than any of Workers could have.")
}
    // TODO:  默認采用 spreadOutApps  調度算法, 將 application需要的 executor資源 分派到  多個 worker 上去
      val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

// Now that we've decided how many cores to allocate on each worker, let's allocate them
for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
allocateWorkerResourceToExecutors(
app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos))
}
}
}
}
判斷一個 worker 是否可以發布 executor
private def canLaunchExecutor(worker: WorkerInfo, desc: ApplicationDescription): Boolean = {
canLaunch(
worker,
desc.memoryPerExecutorMB,
desc.coresPerExecutor.getOrElse(1),
desc.resourceReqsPerExecutor)
}
讓我們看一看里面的 canlaunch 方法
private def canLaunch(
worker: WorkerInfo,
memoryReq: Int,
coresReq: Int,
resourceRequirements: Seq[ResourceRequirement])
: Boolean = {
// worker 上 空閑的 內存值 要 大於等於 請求的 內存值
val enoughMem = worker.memoryFree >= memoryReq
// worker 上 空閑的 core 數 要 大於等於 請求的 core數
val enoughCores = worker.coresFree >= coresReq
// worker 是否滿足 executor 請求的資源
val enoughResources = ResourceUtils.resourcesMeetRequirements(
worker.resourcesAmountFree, resourceRequirements)
enoughMem && enoughCores && enoughResources
}

回到上面的 scheduleExecutorsOnWorkers
private def scheduleExecutorsOnWorkers(
app: ApplicationInfo,
usableWorkers: Array[WorkerInfo],
spreadOutApps: Boolean): Array[Int] = {
val coresPerExecutor = app.desc.coresPerExecutor
val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
// 默認情況下 是 開啟 oneExecutorPerWorker 機制的,也就是默認是在 一個 worker 上 只啟動 一個 executor的
// 如果在spark -submit 腳本中設置了coresPerExecutor , 在worker資源充足的時候,則 會在每個worker 上,啟動多個executor
val oneExecutorPerWorker = coresPerExecutor.isEmpty
val memoryPerExecutor = app.desc.memoryPerExecutorMB
val resourceReqsPerExecutor = app.desc.resourceReqsPerExecutor
val numUsable = usableWorkers.length
val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)

// 判斷 Worker節點是否能夠啟動Executor
def canLaunchExecutorForApp(pos: Int): Boolean = {

val keepScheduling = coresToAssign >= minCoresPerExecutor
val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
val assignedExecutorNum = assignedExecutors(pos)

// If we allow multiple executors per worker, then we can always launch new executors.
// Otherwise, if there is already an executor on this worker, just give it more cores.

// 如果spark -submit 腳本中設置了coresPerExecutor值,
// 或者當前 這個worker 還沒有為這個 application 分配 過 executor ,
val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutorNum == 0
// TODO: 可以啟動新的 Executor
if (launchingNewExecutor) {
val assignedMemory = assignedExecutorNum * memoryPerExecutor
val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
val assignedResources = resourceReqsPerExecutor.map {
req => req.resourceName -> req.amount * assignedExecutorNum
}.toMap
val resourcesFree = usableWorkers(pos).resourcesAmountFree.map {
case (rName, free) => rName -> (free - assignedResources.getOrElse(rName, 0))
}
val enoughResources = ResourceUtils.resourcesMeetRequirements(
resourcesFree, resourceReqsPerExecutor)
val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
keepScheduling && enoughCores && enoughMemory && enoughResources && underLimit
} else {
// We're adding cores to an existing executor, so no need
// to check memory and executor limits
// TODO: 不滿足啟動新的 Executor條件,則 在 老的 Executor 上 追加 core 數
keepScheduling && enoughCores
}
}

// Keep launching executors until no more workers can accommodate any
// more executors, or if we have reached this application's limits

var freeWorkers = (0 until numUsable).filter(canLaunchExecutorForApp)
while (freeWorkers.nonEmpty) {
freeWorkers.foreach { pos =>
var keepScheduling = true
while (keepScheduling && canLaunchExecutorForApp(pos)) {
coresToAssign -= minCoresPerExecutor
assignedCores(pos) += minCoresPerExecutor

// If we are launching one executor per worker, then every iteration assigns 1 core
// to the executor. Otherwise, every iteration assigns cores to a new executor.
if (oneExecutorPerWorker) {
//TODO: 如果該Worker節點不能啟動新的 Executor,則每次在老的executor 上 分配 minCoresPerExecutor 個 CPU core(此時該值默認 為 1 )
assignedExecutors(pos) = 1
} else {
//TODO: 如果該Worker節點可以啟動新的 Executor,則每次在新的executor 上 分配 minCoresPerExecutor 個 CPU core(此時該值為 spark-submit腳本配置的 coresPerExecutor 值)
assignedExecutors(pos) += 1
}

// Spreading out an application means spreading out its executors across as
// many workers as possible. If we are not spreading out, then we should keep
// scheduling executors on this worker until we use all of its resources.
// Otherwise, just move on to the next worker.
if (spreadOutApps) {
// TODO: 這里傳入 keepScheduling = false , 就是每次 worker上只分配 一次 core ,然后 到 下一個 worker 上 再去 分配 core,直到 worker
// TODO: 完成一次遍歷
keepScheduling = false
}
}
}
freeWorkers = freeWorkers.filter(canLaunchExecutorForApp)
}
// 返回每個Worker節點分配的CPU核數
assignedCores
}

再來分析 allocateWorkerResourceToExecutors
private def allocateWorkerResourceToExecutors(
app: ApplicationInfo,
assignedCores: Int,
coresPerExecutor: Option[Int],
worker: WorkerInfo): Unit = {
// If the number of cores per executor is specified, we divide the cores assigned
// to this worker evenly among the executors with no remainder.
// Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
for (i <- 1 to numExecutors) {
val allocated = worker.acquireResources(app.desc.resourceReqsPerExecutor)
// TODO : 當前 這個 application 追加 一次 Executor
val exec = app.addExecutor(worker, coresToAssign, allocated)
//TODO: 給worker 線程 發送 launchExecutor 命令
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
ok,至此,spark最新版本 spark-3.0的Application 調度算法分析完畢!!!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM