歡迎轉載,轉載請注明出處,徽滬一郎。
概要
本文主要講述在standalone cluster部署模式下,Spark Application在整個運行期間,資源(主要是cpu core和內存)的申請與釋放。
構成Standalone cluster部署模式的四大組成部件如下圖所示,分別為Master, worker, executor和driver,它們各自運行於獨立的JVM進程。
從資源管理的角度來說
- Master 掌管整個cluster的資源,主要是指cpu core和memory,但Master自身並不擁有這些資源
- Worker 計算資源的實際貢獻者,須向Master匯報自身擁有多少cpu core和memory, 在master的指示下負責啟動executor
- Executor 執行真正計算的苦力,由master來決定該進程擁有的core和memory數值
- Driver 資源的實際占用者,Driver會提交一到多個job,每個job在拆分成多個task之后,會分發到各個executor真正的執行
這些內容在standalone cluster模式下的容錯性分析中也有所涉及,今天主要講一下資源在分配之后不同場景下是如何被順利回收的。
資源上報匯聚過程
standalone cluster下最主要的當然是master,master必須先於worker和driver程序正常啟動。
當master順利啟動完畢,可以開始worker的啟動工作,worker在啟動的時候需要向master發起注冊,在注冊消息中帶有本worker節點的cpu core和內存。
調用順序如下preStart->registerWithMaster->tryRegisterAllMasters
看一看tryRegisterAllMasters的代碼
def tryRegisterAllMasters() {
for (masterUrl <- masterUrls) {
logInfo("Connecting to master " + masterUrl + "...")
val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
}
}
我們的疑問是RegisterWorker構造函數所需的參數memory和cores是從哪里獲取的呢?
注意一下Worker中的main函數會創建WorkerArguments,
def main(argStrings: Array[String]) {
SignalLogger.register(log)
val args = new WorkerArguments(argStrings)
val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
args.memory, args.masters, args.workDir)
actorSystem.awaitTermination()
}
memory通過函數inferDefaultMemory獲取,而cores通過inferDefaultCores獲取。
def inferDefaultCores(): Int = {
Runtime.getRuntime.availableProcessors()
}
def inferDefaultMemory(): Int = {
val ibmVendor = System.getProperty("java.vendor").contains("IBM")
var totalMb = 0
try {
val bean = ManagementFactory.getOperatingSystemMXBean()
if (ibmVendor) {
val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
} else {
val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
}
} catch {
case e: Exception => {
totalMb = 2*1024
System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
}
}
// Leave out 1 GB for the operating system, but don't return a negative memory size
math.max(totalMb - 1024, 512)
}
如果已經在配置文件中為顯示指定了每個worker的core和memory,則使用配置文件中的值,具體配置參數為SPARK_WORKER_CORES和SPARK_WORKER_MEMORY
Master在收到RegisterWork消息之后,根據上報的信息為每一個worker創建相應的WorkerInfo.
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
{
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
} else {
val workerAddress = worker.actor.path.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress)
}
}
資源分配過程
如果在worker注冊上來的時候,已經有Driver Application注冊上來,那么就需要將原先處於未分配資源狀態的driver application啟動相應的executor。
WorkerInfo在schedule函數中會被使用到,schedule函數處理邏輯概述如下
- 查看目前存活的worker中剩余的內存是否能夠滿足application每個task的最低需求,如果是則將該worker加入到可分配資源的隊列
- 根據分發策略,如果是決定將工作平攤到每個worker,則每次在一個worker上占用一個core,直到所有可分配資源耗盡或已經滿足driver的需求
- 如果分發策略是分發到盡可能少的worker,則一次占用盡worker上的可分配core,直到driver的core需求得到滿足
- 根據步驟2或3的結果在每個worker上添加相應的executor,處理函數是addExecutor
為了敘述簡單,現僅列出平攤到各個worker的分配處理過程
for (worker > workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
for (app <- waitingApps if app.coresLeft > 0) {
if (canUse(app, worker)) {
val coresToUse = math.min(worker.coresFree, app.coresLeft)
if (coresToUse > 0) {
val exec = app.addExecutor(worker, coresToUse)
launchExecutor(worker, exec)
app.state = ApplicationState.RUNNING
}
}
}
}
launchExecutor主要負責兩件事情
- 記錄下新添加的executor使用掉的cpu core和內存數目,記錄過程發生在worker.addExecutor
- 向worker發送LaunchExecutor指令
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
worker.actor ! LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
exec.application.driver ! ExecutorAdded(
exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
}
worker在收到LaunchExecutor指令后,也會記一筆賬,將要使用掉的cpu core和memory從可用資源中減去,然后使用ExecutorRunner來負責生成Executor進程,注意Executor運行於獨立的進程。代碼如下
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
workDir, akkaUrl, conf, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
}
} catch {
case e: Exception => {
logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
}
}
}
在資源分配過程中需要注意到的是如果有多個Driver Application處於等待狀態,資源分配的原則是FIFO,先到先得。
資源回收過程
worker中上報的資源最終被driver application中提交的job task所占用,如果application結束(包括正常和異常退出),application所占用的資源就應該被順利回收,即將占用的資源重新歸入可分配資源行列。
現在的問題轉換成Master和Executor如何知道Driver Application已經退出了呢?
有兩種不同的處理方式,一種是先道別后離開,一種是不告而別。現分別闡述。
何為先道別后離開,即driver application顯式的通知master和executor,任務已經完成了,我要bye了。應用程序顯式的調用SparkContext.stop
def stop() {
postApplicationEnd()
ui.stop()
// Do this only if not stopped already - best case effort.
// prevent NPE if stopped more than once.
val dagSchedulerCopy = dagScheduler
dagScheduler = null
if (dagSchedulerCopy != null) {
metadataCleaner.cancel()
cleaner.foreach(_.stop())
dagSchedulerCopy.stop()
taskScheduler = null
// TODO: Cache.stop()?
env.stop()
SparkEnv.set(null)
ShuffleMapTask.clearCache()
ResultTask.clearCache()
listenerBus.stop()
eventLogger.foreach(_.stop())
logInfo("Successfully stopped SparkContext")
} else {
logInfo("SparkContext already stopped")
}
}
顯式調用SparkContext.stop的一個主要功能是會去顯式的停止Executor,具體下達StopExecutor指令的代碼見於CoarseGrainedSchedulerBackend中的stop函數
override def stop() {
stopExecutors()
try {
if (driverActor != null) {
val future = driverActor.ask(StopDriver)(timeout)
Await.ready(future, timeout)
}
} catch {
case e: Exception =>
throw new SparkException("Error stopping standalone scheduler's driver actor", e)
}
}
那么Master又是如何知道Driver Application退出的呢?這要歸功於Akka的通訊機制了,當相互通訊的任意一方異常退出,另一方都會收到DisassociatedEvent, Master也就是在這個消息處理中移除已經停止的Driver Application。
case DisassociatedEvent(_, address, _) => {
// The disconnected client could've been either a worker or an app; remove whichever it was
logInfo(s"$address got disassociated, removing it.")
addressToWorker.get(address).foreach(removeWorker)
addressToApp.get(address).foreach(finishApplication)
if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
}
不告而別的方式下Executor是如何知道自己所服務的application已經順利完成使命了呢?道理和master的一樣,還是通過DisassociatedEvent來感知。詳見CoarseGrainedExecutorBackend中的receive函數
case x: DisassociatedEvent =>
logError(s"Driver $x disassociated! Shutting down.")
System.exit(1)
異常情況下的資源回收
由於Master和Worker之間的心跳機制,如果worker異常退出, Master會由心跳機制感知到其消亡,進而將其上報的資源移除。
Executor異常退出時,Worker中的監控線程ExecutorRunner會立即感知,進而上報給Master,Master會回收資源,並重新要求worker啟動executor。