歡迎轉載,轉載請注明出處,徽滬一郎。
概要
本文就standalone部署方式下的容錯性問題做比較細致的分析,主要回答standalone部署方式下的包含哪些主要節點,當某一類節點出現問題時,系統是如何處理的。
Standalone部署的節點組成
介紹Spark的資料中對於RDD這個概念涉及的比較多,但對於RDD如何運行起來,如何對應到進程和線程的,着墨的不是很多。
在實際的生產環境中,Spark總是會以集群的方式進行運行的,其中standalone的部署方式是所有集群方式中最為精簡的一種,另外是Mesos和YARN,要理解其內部運行機理,顯然要花更多的時間才能了解清楚。
standalone cluster的組成
standalone集群由三個不同級別的節點組成,分別是
- Master 主控節點,可以類比為董事長或總舵主,在整個集群之中,最多只有一個Master處在Active狀態
- Worker 工作節點 ,這個是manager,是分舵主, 在整個集群中,可以有多個worker,如果worker為零,什么事也做不了
- Executor 干苦力活的,直接受worker掌控,一個worker可以啟動多個executor,啟動的個數受限於機器中的cpu核數
這三種不同類型的節點各自運行於自己的JVM進程之中。
Driver Application
提交到standalone集群的應用程序稱之為Driver Applicaton。
Standalone集群啟動及任務提交過程詳解
上圖總結了正常情況下Standalone集群的啟動以及應用提交時,各節點之間有哪些消息交互。下面分集群啟動和應用提交兩個過程來作詳細說明。
集群啟動過程
正常啟動過程如下所述
step 1: 啟動master
$SPARK_HOME/sbin/start-master.sh
step 2: 啟動worker
./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077
worker啟動之后,會做兩件事情
- 將自己注冊到Master, RegisterWorker
- 定期發送心跳消息給Master
任務提交過程
step 1: 提交application
利用如下指令來啟動spark-shell
MASTER=spark://127.0.0.1:7077 $SPARK_HOME/bin/spark-shell
運行spark-shell時,會向Master發送RegisterApplication請求
日志位置: master運行產生的日志在$SPARK_HOME/logs目錄下
step 2: Master處理RegisterApplication的請求之后
收到RegisterApplication請求之后,Mastet會做如下處理
- 如果有worker已經注冊上來,發送LaunchExecutor指令給相應worker
- 如果沒有,則什么事也不做
step 3: 啟動Executor
Worker在收到LaunchExecutor指令之后,會啟動Executor進程
step 4: 注冊Executor
啟動的Executor進程會根據啟動時的入參,將自己注冊到Driver中的SchedulerBackend
日志位置: executor的運行日志在$SPARK_HOME/work目錄下
step 5: 運行Task
SchedulerBackend收到Executor的注冊消息之后,會將提交到的Spark Job分解為多個具體的Task,然后通過LaunchTask指令將這些Task分散到各個Executor上真正的運行
如果在調用runJob的時候,沒有任何的Executor注冊到SchedulerBackend,相應的處理邏輯是什么呢?
- SchedulerBackend會將Task存儲在TaskManager中
- 一旦有Executor注冊上來,就將TaskManager管理的尚未運行的task提交到executor中
- 如果有多個job處於pending狀態,默認調度策略是FIFO,即先提交的先運行
測試步驟
- 啟動Master
- 啟動spark-shell
- 執行 sc.textFile("README.md").count
- 啟動worker
- 注意worker啟動之后,spark-shell中打印出來的日志消息
Job執行結束
任務運行結束時,會將相應的Executor停掉。
可以做如下的試驗
- 停止spark-shell
- 利用ps -ef|grep -i java查看java進程,可以發現CoarseGrainedExecutorBackend進程已經退出
小結
通過上面的控制消息原語之間的先后順序可以看出
- Master和worker進程必須顯式啟動
- executor是被worker隱式的帶起
- 集群的啟動順序
- Master必須先於其它節點啟動
- worker和driver哪個先啟動,無所謂
- 但driver提交的job只有在有相應的worker注冊到Master之后才可以被真正的執行
異常場景分析
上面說明的是正常情況下,各節點的消息分發細節。那么如果在運行中,集群中的某些節點出現了問題,整個集群是否還能夠正常處理Application中的任務呢?
異常分析1: worker異常退出
在Spark運行過程中,經常碰到的問題就是worker異常退出,當worker退出時,整個集群會有哪些故事發生呢? 請看下面的具體描述
- worker異常退出,比如說有意識的通過kill指令將worker殺死
- worker在退出之前,會將自己所管控的所有小弟executor全干掉
- worker需要定期向master改善心跳消息的,現在worker進程都已經玩完了,哪有心跳消息,所以Master會在超時處理中意識到有一個“分舵”離開了
- Master非常傷心,傷心的Master將情況匯報給了相應的Driver
- Driver通過兩方面確認分配給自己的Executor不幸離開了,一是Master發送過來的通知,二是Driver沒有在規定時間內收到Executor的StatusUpdate,於是Driver會將注冊的Executor移除
后果分析
worker異常退出會帶來哪些影響
- executor退出導致提交的task無法正常結束,會被再一次提交運行
- 如果所有的worker都異常退出,則整個集群不可用
- 需要有相應的程序來重啟worker進程,比如使用supervisord或runit
測試步驟
- 啟動Master
- 啟動worker
- 啟動spark-shell
- 手工kill掉worker進程
- 用jps或ps -ef|grep -i java來查看啟動着的java進程
異常退出的代碼處理
定義於ExecutorRunner.scala的start函數
def start() {
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = new Thread() {
override def run() {
killProcess(Some("Worker shutting down"))
}
}
Runtime.getRuntime.addShutdownHook(shutdownHook)
}
killProcess的過程就是停止相應CoarseGrainedExecutorBackend的過程。
worker停止的時候,一定要先將自己啟動的Executor停止掉。這是不是很像水滸中宋江的手段,李逵就是這樣不明不白的把命給丟了。
小結
需要特別指出的是,當worker在啟動Executor的時候,是通過ExecutorRunner來完成的,ExecutorRunner是一個獨立的線程,和Executor是一對一的關系,這很重要。Executor作為一個獨立的進程在運行,但會受到ExecutorRunner的嚴密監控。
異常分析2: executor異常退出
Executor作為Standalone集群部署方式下的最底層員工,一旦異常退出,其后果會是什么呢?
- executor異常退出,ExecutorRunner注意到異常,將情況通過ExecutorStateChanged匯報給Master
- Master收到通知之后,非常不高興,盡然有小弟要跑路,那還了得,要求Executor所屬的worker再次啟動
- Worker收到LaunchExecutor指令,再次啟動executor
作為一名底層員工,想輕易摞挑子不干是不成的。"人在江湖,身不由己“啊。
測試步驟
- 啟動Master
- 啟動Worker
- 啟動spark-shell
- 手工kill掉CoarseGrainedExecutorBackend
fetchAndRunExecutor
fetchAndRunExecutor負責啟動具體的Executor,並監控其運行狀態,具體代碼邏輯如下所示
def fetchAndRunExecutor() {
try {
// Create the executor's working directory
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// Launch the process
val command = getCommandSeq
logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
val builder = new ProcessBuilder(command: _*).directory(executorDir)
val env = builder.environment()
for ((key, value) {
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED
killProcess(None)
}
case e: Exception => {
logError("Error running executor", e)
state = ExecutorState.FAILED
killProcess(Some(e.toString))
}
}
}
異常分析3: master 異常退出
worker和executor異常退出的場景都講到了,我們剩下最后一種情況了,master掛掉了怎么辦?
帶頭大哥如果不在了,會是什么后果呢?
- worker沒有匯報的對象了,也就是如果executor再次跑飛,worker是不會將executor啟動起來的,大哥沒給指令
- 無法向集群提交新的任務
- 老的任務即便結束了,占用的資源也無法清除,因為資源清除的指令是Master發出的
怎么樣,知道后果很嚴重了吧?別看老大平時不干活,要真的不在,僅憑小弟們是不行的。
Master單點失效問題的解決
那么怎么解決Master單點失效的問題呢?
你說再加一個Master就是了,兩個老大。兩個老大如果同時具有指揮權,結果也將是災難性的。設立一個副職人員,當目前的正職掛掉之后,副職接管。也就是同一時刻,有且只有一個active master。
注意不錯,如何實現呢?使用zookeeper的ElectLeader功能,效果圖如下
配置細節
如何搭建zookeeper集群,這里不再廢話,哪天有空的話再整一整,或者可以參考寫的storm系列中談到的zookeeper的集群安裝步驟。
假設zookeeper集群已經設置成功,那么如何啟動standalone集群中的節點呢?有哪些特別的地方?
conf/spark-env.sh
在conf/spark-env.sh中,為SPARK_DAEMON_JAVA_OPTS添加如下選項
System property | Meaning |
spark.deploy.recoveryMode | Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE). |
spark.deploy.zookeeper.url | The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181). |
spark.deploy.zookeeper.dir | The directory in ZooKeeper to store recovery state (default: /spark). |
設置SPARK_DAEMON_JAVA_OPTS的實際例子
SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.deploy.recoveryMode=ZOOKEEPER"
應用程序啟動
應用程序運行的時候,指定多個master地址,用逗號分開,如下所示
MASTER=spark://192.168.100.101:7077,spark://192.168.100.102:7077 bin/spark-shell
小結
Standalone集群部署方式下的容錯性分析讓我們對於Spark的任務分發過程又有了進一處的認識。前面的篇章從整體上匆匆過了一遍Spark所涉及的知識點,分析的不夠深,不夠細。
此篇嘗試着就某一具體問題做深入的分析。套用書畫中的說法,在框架分析的時候,我們可以”大開大合,疏可走馬,計白當黑“,在細節分析的時候,又要做到“密不透風,條分縷析,層層遞進”。