二:Spark Worker啟動Driver源碼解析
case LaunchDriver(driverId, driverDesc) => { logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner(//代理模式啟動Driver
conf, driverId, workDir, sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, securityMgr) drivers(driverId) = driver//將生成的DriverRunner對象按照driverId放到drivers數組中,這里面存放的HashMap的鍵值對,鍵為driverId,值為DriverRunner對象,用來標識當前的DriverRunner對象
driver.start() //driver啟動之后,將使用的cores和內存記錄起來。
coresUsed += driverDesc.cores memoryUsed += driverDesc.mem }
補充說明:如果Cluster上的driver啟動失敗或者崩潰的時候,如果driverDescription的supervise設置的為true的時候,會自動重啟,由worker負責它的重新啟動。
DriverRunner對象
private[deploy] class DriverRunner( conf: SparkConf, val driverId: String, val workDir: File, val sparkHome: File, val driverDesc: DriverDescription, val worker: RpcEndpointRef, val workerUrl: String, val securityManager: SecurityManager) extends Logging {
DriverRunner的構造方法,包括driver啟動時的一些配置信息。這個類中封裝了一個start方法,開啟新的線程來啟動driver
/** Starts a thread to run and manage the driver. */
private[worker] def start() = { new Thread("DriverRunner for " + driverId) {//使用java的線程代碼開啟新線程來啟動driver
override def run() { try { val driverDir = createWorkingDirectory()//創建driver工作目錄
val localJarFilename = downloadUserJar(driverDir)//從hdfs上下載用戶的jar包依賴(用戶把jar提交給集群,會存儲在hdfs上)
def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{USER_JAR}}" => localJarFilename case other => other } // TODO: If we add ability to submit multiple jars they should also be added here
val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,//如通過processBuilder來launchDriver
driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) launchDriver(builder, driverDir, driverDesc.supervise) } catch { case e: Exception => finalException = Some(e) } val state =
if (killed) { DriverState.KILLED } else if (finalException.isDefined) { DriverState.ERROR } else { finalExitCode match { case Some(0) => DriverState.FINISHED case _ => DriverState.FAILED } } finalState = Some(state) worker.send(DriverStateChanged(driverId, state, finalException))//啟動發生異常會向worker發消息。
} }.start() }
可以看出在run方法中會創建driver的工作目錄
/** * Creates the working directory for this driver. * Will throw an exception if there are errors preparing the directory. */
private def createWorkingDirectory(): File = { val driverDir = new File(workDir, driverId) if (!driverDir.exists() && !driverDir.mkdirs()) { throw new IOException("Failed to create directory " + driverDir) } driverDir }
接下來會通過processBuilder來launchDriver
def buildProcessBuilder( command: Command, securityMgr: SecurityManager, memory: Int, sparkHome: String, substituteArguments: String => String, classPaths: Seq[String] = Seq[String](), env: Map[String, String] = sys.env): ProcessBuilder = { val localCommand = buildLocalCommand( command, securityMgr, substituteArguments, classPaths, env) val commandSeq = buildCommandSeq(localCommand, memory, sparkHome) val builder = new ProcessBuilder(commandSeq: _*) val environment = builder.environment() for ((key, value) <- localCommand.environment) { environment.put(key, value) } builder }
剩下的就是異常處理了,這部分就是java的異常處理機制。需要說明的是如果啟動失敗,會發消息給worker和master。通知driver狀態發生了改變。
case class DriverStateChanged( driverId: String, state: DriverState, exception: Option[Exception]) extends DeployMessage
三:Worker啟動Executor源碼解析
Worker啟動Executor的過程跟啟動Driver基本一致,從本質上來說,Driver就是Worker上的一個Executor(當然是指Cluster模式)。這里就附上源碼,不在展開了
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) // Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) } // Create local dirs for the executor. These are passed to the executor via the // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the // application finishes.
val appLocalDirs = appDirectories.get(appId).getOrElse { Utils.getOrCreateLocalRootDirs(conf).map { dir => val appDir = Utils.createDirectory(dir, namePrefix = "executor") Utils.chmod700(appDir) appDir.getAbsolutePath() }.toSeq } appDirectories(appId) = appLocalDirs val manager = new ExecutorRunner( appId, execId, appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), cores_, memory_, self, workerId, host, webUi.boundPort, publicAddress, sparkHome, executorDir, workerUri, conf, appLocalDirs, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) } catch { case e: Exception => { logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(e.toString), None)) } } }