歡迎轉載,轉載請注明出處,徽滬一郎。
楔子
在Spark源碼走讀系列之2中曾經提到Spark能以Standalone的方式來運行cluster,但沒有對Application的提交與具體運行流程做詳細的分析,本文就這些問題做一個比較詳細的分析,並且對在standalone模式下如何實現HA進行講解。
沒有HA的Standalone運行模式
先從比較簡單的說起,所謂的沒有ha是指master節點沒有ha。
組成cluster的兩大元素即Master和Worker。slave worker可以有1到多個,這些worker都處於active狀態。
Driver Application可以運行在Cluster之內,也可以在cluster之外運行,先從簡單的講起即Driver Application獨立於Cluster。那么這樣的整體框架如下圖所示,由driver,master和多個slave worker來共同組成整個的運行環境。

執行順序
步驟1 運行master
$SPARK_HOME/sbin/start_master.sh
在start_master.sh中最關鍵的一句就是
"$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT
檢測Master的jvm進程
root 23438 1 67 22:57 pts/0 00:00:05 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080
Master的日志在$SPARK_HOME/logs目錄下
步驟2 運行worker,可以啟動多個
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077
worker運行時,需要注冊到指定的master url,這里就是spark://localhost:7077.
Master側收到RegisterWorker通知,其處理代碼如下
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
{
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
workerHost, workerPort, cores, Utils.megabytesToString(memory)))
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
sender, workerUiPort, publicAddress)
if (registerWorker(worker)) {
persistenceEngine.addWorker(worker)
sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
schedule()
} else {
val workerAddress = worker.actor.path.address
logWarning("Worker registration failed. Attempted to re-register worker at same " +
"address: " + workerAddress)
sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
+ workerAddress)
}
}
}
步驟3 運行Spark-shell
MASTER=spark://localhost:7077 $SPARK_HOME/bin/spark-shell
spark-shell屬於application,有關appliation的運行日志存儲在$SPARK_HOME/works目錄下
spark-shell作為application,在Master側其處理的分支是RegisterApplication,具體處理代碼如下。
case RegisterApplication(description) => {
if (state == RecoveryState.STANDBY) {
// ignore, don't send response
} else {
logInfo("Registering app " + description.name)
val app = createApplication(description, sender)
registerApplication(app)
logInfo("Registered app " + description.name + " with ID " + app.id)
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
}
}
每當有新的application注冊到master,master都要調度schedule函數將application發送到相應的worker,在對應的worker啟動相應的ExecutorBackend. 具體代碼請參考Master.scala中的schedule函數,代碼就不再列出。
步驟4 結果檢測
/opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.master.Master --ip localhost --port 7077 --webui-port 8080
root 23752 23745 21 23:00 pts/0 00:00:25 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.repl.Main
root 23986 23938 25 23:02 pts/2 00:00:03 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Dspark.akka.logLifecycleEvents=true -Djava.library.path= -Xms512m -Xmx512m org.apache.spark.deploy.worker.Worker spark://localhost:7077
root 24047 23986 34 23:02 pts/2 00:00:04 /opt/java/bin/java -cp :/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar -Xms512M -Xmx512M org.apache.spark.executor.CoarseGrainedExecutorBackend akka.tcp://spark@localhost:40053/user/CoarseGrainedScheduler 0 localhost 4 akka.tcp://sparkWorker@localhost:53568/user/Worker app-20140511230059-0000
從運行的進程之間的關系可以看出,worker和master之間的連接建立完畢之后,如果有新的driver application連接上master,master會要求worker啟動相應的ExecutorBackend進程。此后若有什么Task需要運行,則會運行在這些Executor之上。可以從以下的日志信息得出此結論,當然看源碼亦可。
14/05/11 23:02:36 INFO Worker: Asked to launch executor app-20140511230059-0000/0 for Spark shell
14/05/11 23:02:36 INFO ExecutorRunner: Launch command: "/opt/java/bin/java" "-cp" ":/root/working/spark-0.9.1-bin-hadoop2/conf:/root/working/spark-0.9.1-bin-hadoop2/assembly/target/scala-2.10/spark-assembly_2.10-0.9.1-hadoop2.2.0.jar" "-Xms512M" "-Xmx512M" "org.apache.spark.executor.CoarseGrainedExecutorBackend" "akka.tcp://spark@localhost:40053/user/CoarseGrainedScheduler" "0" "localhost" "4" "akka.tcp://sparkWorker@localhost:53568/user/Worker" "app-20140511230059-0000"
worker中啟動exectuor的相關源碼見worker中的receive函數,相關代碼如下
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
workDir, akkaUrl, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
}
} catch {
case e: Exception => {
logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
}
}
}
}
關於standalone的部署,需要詳細研究的源碼文件如下所列。
- deploy/master/Master.scala
- deploy/worker/worker.scala
- executor/CoarseGrainedExecutorBackend.scala
查看進程之間的父子關系,請用"pstree"
使用下圖來小結單Master的部署情況。

類的動態加載和反射
在談部署Driver到Cluster上之前,我們先回顧一下java的一大特性“類的動態加載和反射機制”。本人不是一直寫java代碼出身,所以好多東西都是邊用邊學,難免掛一漏萬。
所謂的反射,其實就是要解決在運行期實現類的動態加載。
來個簡單的例子
package test;
public class Demo {
public Demo() {
System.out.println("Hi!");
}
@SuppressWarnings("unchecked")
public static void main(String[] args) throws Exception {
Class clazz = Class.forName("test.Demo");
Demo demo = (Demo) clazz.newInstance();
}
}
談到這里,就自然想到了一個面試題,“談一談Class.forName和ClassLoader.loadClass的區別"。說到面試,我總是很沒有信心,面試官都很屌的, :)。
在cluster中運行Driver Application
上一節之所以寫到類的動態加載與反射都是為了談這一節的內容奠定基礎。
將Driver application部署到Cluster中,啟動的時序大體如下圖所示。

- 首先啟動Master,然后啟動Worker
- 使用”deploy.Client"將Driver Application提交到Cluster中
./bin/spark-class org.apache.spark.deploy.Client launch
[client-options] \
\
[application-options]
- Master在收到RegisterDriver的請求之后,會發送LaunchDriver給worker,要求worker啟動一個Driver的jvm process
- Driver Application在新生成的JVM進程中運行開始時會注冊到master中,發送RegisterApplication給Master
- Master發送LaunchExecutor給Worker,要求Worker啟動執行ExecutorBackend的JVM Process
- 一當ExecutorBackend啟動完畢,Driver Application就可以將任務提交到ExecutorBackend上面執行,即LaunchTask指令
提交側的代碼,詳見deploy/Client.scala
driverArgs.cmd match {
case "launch" =>
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
// truncate filesystem paths similar to what YARN does. For now, we just require
// people call `addJar` assuming the jar is in the same directory.
val env = Map[String, String]()
System.getenv().foreach{case (k, v) => env(k) = v}
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
val classPathConf = "spark.driver.extraClassPath"
val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val libraryPathConf = "spark.driver.extraLibraryPath"
val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
cp.split(java.io.File.pathSeparator)
}
val javaOptionsConf = "spark.driver.extraJavaOptions"
val javaOpts = sys.props.get(javaOptionsConf)
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
val driverDescription = new DriverDescription(
driverArgs.jarUrl,
driverArgs.memory,
driverArgs.cores,
driverArgs.supervise,
command)
masterActor ! RequestSubmitDriver(driverDescription)
接收側
從Deploy.client發送出來的消息被誰接收呢?答案比較明顯,那就是Master。 Master.scala中的receive函數有專門針對RequestSubmitDriver的處理,具體代碼如下
case RequestSubmitDriver(description) => {
if (state != RecoveryState.ALIVE) {
val msg = s"Can only accept driver submissions in ALIVE state. Current state: $state."
sender ! SubmitDriverResponse(false, None, msg)
} else {
logInfo("Driver submitted " + description.command.mainClass)
val driver = createDriver(description)
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
// the current status of the driver. For now it's simply "fire and forget".
sender ! SubmitDriverResponse(true, Some(driver.id),
s"Driver successfully submitted as ${driver.id}")
}
}
SparkEnv
SparkEnv對於整個Spark的任務來說非常關鍵,不同的role在創建SparkEnv時傳入的參數是不相同的,如Driver和Executor則存在重要區別。
在Executor.scala中,創建SparkEnv的代碼如下所示
private val env = {
if (!isLocal) {
val _env = SparkEnv.create(conf, executorId, slaveHostname, 0,
isDriver = false, isLocal = false)
SparkEnv.set(_env)
_env.metricsSystem.registerSource(executorSource)
_env
} else {
SparkEnv.get
}
}
Driver Application則會創建SparkContext,在SparkContext創建過程中,比較重要的一步就是生成SparkEnv,其代碼如下
private[spark] val env = SparkEnv.create(
conf,
"",
conf.get("spark.driver.host"),
conf.get("spark.driver.port").toInt,
isDriver = true,
isLocal = isLocal,
listenerBus = listenerBus)
SparkEnv.set(env)
Standalone模式下HA的實現
Spark在standalone模式下利用zookeeper來實現了HA機制,這里所說的HA是專門針對Master節點的,因為上面所有的分析可以看出Master是整個cluster中唯一可能出現單點失效的節點。
采用zookeeper之后,整個cluster的組成如下圖所示。

為了使用zookeeper,Master在啟動的時候需要指定如下的參數,修改conf/spark-env.sh, SPARK_DAEMON_JAVA_OPTS中添加如下選項。
| System property | Meaning |
| spark.deploy.recoveryMode | Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE). |
| spark.deploy.zookeeper.url | The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181). |
| spark.deploy.zookeeper.dir | The directory in ZooKeeper to store recovery state (default: /spark). |
實現HA的原理
zookeeper提供了一個Leader Election機制,利用這個機制,可以實現HA功能,具體請參考zookeeper recipes
在Spark中沒有直接使用zookeeper的api,而是使用了curator,curator對zookeeper做了相應的封裝,在使用上更為友好。
小結
步步演進講到在standalone模式下,如何利用zookeeper來實現ha。從中可以看出standalone master一個最主要的任務就是resource management和job scheduling,看到這兩個主要功能的時候,您也許會想到這不就是YARN要解決的問題。對了,從本質上來說standalone是yarn的一個簡化版本。
本系列下篇內容就要仔細講講spark部署到YARN上的實現細節。
參考資料
- Spark Standalone Mode http://spark.apache.org/docs/latest/spark-standalone.html
- Cluster Mode Overview http://spark.apache.org/docs/latest/cluster-overview.html
