Apache Spark源碼走讀之15 -- Standalone部署模式下的容錯性分析


歡迎轉載,轉載請注明出處,徽滬一郎。

概要

本文就standalone部署方式下的容錯性問題做比較細致的分析,主要回答standalone部署方式下的包含哪些主要節點,當某一類節點出現問題時,系統是如何處理的。

Standalone部署的節點組成

介紹Spark的資料中對於RDD這個概念涉及的比較多,但對於RDD如何運行起來,如何對應到進程和線程的,着墨的不是很多。

在實際的生產環境中,Spark總是會以集群的方式進行運行的,其中standalone的部署方式是所有集群方式中最為精簡的一種,另外是Mesos和YARN,要理解其內部運行機理,顯然要花更多的時間才能了解清楚。

standalone cluster的組成

standalone集群由三個不同級別的節點組成,分別是

  • Master 主控節點,可以類比為董事長或總舵主,在整個集群之中,最多只有一個Master處在Active狀態
  • Worker 工作節點 ,這個是manager,是分舵主, 在整個集群中,可以有多個worker,如果worker為零,什么事也做不了
  • Executor 干苦力活的,直接受worker掌控,一個worker可以啟動多個executor,啟動的個數受限於機器中的cpu核數

這三種不同類型的節點各自運行於自己的JVM進程之中

Driver Application

提交到standalone集群的應用程序稱之為Driver Applicaton。

Standalone集群啟動及任務提交過程詳解

 

上圖總結了正常情況下Standalone集群的啟動以及應用提交時,各節點之間有哪些消息交互。下面分集群啟動和應用提交兩個過程來作詳細說明。

集群啟動過程

正常啟動過程如下所述

step 1: 啟動master

$SPARK_HOME/sbin/start-master.sh

step 2: 啟動worker

./bin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077

worker啟動之后,會做兩件事情

  1. 將自己注冊到Master, RegisterWorker
  2. 定期發送心跳消息給Master

任務提交過程

step 1: 提交application

利用如下指令來啟動spark-shell

MASTER=spark://127.0.0.1:7077 $SPARK_HOME/bin/spark-shell

運行spark-shell時,會向Master發送RegisterApplication請求

日志位置: master運行產生的日志在$SPARK_HOME/logs目錄下

step 2: Master處理RegisterApplication的請求之后

收到RegisterApplication請求之后,Mastet會做如下處理

  1. 如果有worker已經注冊上來,發送LaunchExecutor指令給相應worker
  2. 如果沒有,則什么事也不做

step 3: 啟動Executor

Worker在收到LaunchExecutor指令之后,會啟動Executor進程

step 4: 注冊Executor

啟動的Executor進程會根據啟動時的入參,將自己注冊到Driver中的SchedulerBackend

日志位置: executor的運行日志在$SPARK_HOME/work目錄下

step 5: 運行Task

SchedulerBackend收到Executor的注冊消息之后,會將提交到的Spark Job分解為多個具體的Task,然后通過LaunchTask指令將這些Task分散到各個Executor上真正的運行

如果在調用runJob的時候,沒有任何的Executor注冊到SchedulerBackend,相應的處理邏輯是什么呢?

  1. SchedulerBackend會將Task存儲在TaskManager中
  2. 一旦有Executor注冊上來,就將TaskManager管理的尚未運行的task提交到executor中
  3. 如果有多個job處於pending狀態,默認調度策略是FIFO,即先提交的先運行

測試步驟

  1. 啟動Master
  2. 啟動spark-shell
  3. 執行 sc.textFile("README.md").count
  4. 啟動worker
  5. 注意worker啟動之后,spark-shell中打印出來的日志消息

Job執行結束

任務運行結束時,會將相應的Executor停掉。

可以做如下的試驗

  1. 停止spark-shell
  2. 利用ps -ef|grep -i java查看java進程,可以發現CoarseGrainedExecutorBackend進程已經退出

小結

通過上面的控制消息原語之間的先后順序可以看出

  1. Master和worker進程必須顯式啟動
  2. executor是被worker隱式的帶起
  3. 集群的啟動順序
    1. Master必須先於其它節點啟動
    2. worker和driver哪個先啟動,無所謂
    3. 但driver提交的job只有在有相應的worker注冊到Master之后才可以被真正的執行

異常場景分析

 上面說明的是正常情況下,各節點的消息分發細節。那么如果在運行中,集群中的某些節點出現了問題,整個集群是否還能夠正常處理Application中的任務呢?

異常分析1: worker異常退出

在Spark運行過程中,經常碰到的問題就是worker異常退出,當worker退出時,整個集群會有哪些故事發生呢? 請看下面的具體描述

  1. worker異常退出,比如說有意識的通過kill指令將worker殺死
  2. worker在退出之前,會將自己所管控的所有小弟executor全干掉
  3. worker需要定期向master改善心跳消息的,現在worker進程都已經玩完了,哪有心跳消息,所以Master會在超時處理中意識到有一個“分舵”離開了
  4. Master非常傷心,傷心的Master將情況匯報給了相應的Driver
  5. Driver通過兩方面確認分配給自己的Executor不幸離開了,一是Master發送過來的通知,二是Driver沒有在規定時間內收到Executor的StatusUpdate,於是Driver會將注冊的Executor移除

后果分析

worker異常退出會帶來哪些影響

  1. executor退出導致提交的task無法正常結束,會被再一次提交運行
  2. 如果所有的worker都異常退出,則整個集群不可用
  3. 需要有相應的程序來重啟worker進程,比如使用supervisordrunit

測試步驟

  • 啟動Master
  • 啟動worker
  • 啟動spark-shell
  • 手工kill掉worker進程
  • 用jps或ps -ef|grep -i java來查看啟動着的java進程

異常退出的代碼處理

定義於ExecutorRunner.scala的start函數

def start() {
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run() { fetchAndRunExecutor() }
    }
    workerThread.start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = new Thread() {
      override def run() {
        killProcess(Some("Worker shutting down"))
      }
    }
    Runtime.getRuntime.addShutdownHook(shutdownHook)
  }

killProcess的過程就是停止相應CoarseGrainedExecutorBackend的過程。

worker停止的時候,一定要先將自己啟動的Executor停止掉。這是不是很像水滸中宋江的手段,李逵就是這樣不明不白的把命給丟了。

小結

需要特別指出的是,當worker在啟動Executor的時候,是通過ExecutorRunner來完成的,ExecutorRunner是一個獨立的線程,和Executor是一對一的關系,這很重要。Executor作為一個獨立的進程在運行,但會受到ExecutorRunner的嚴密監控。

異常分析2: executor異常退出

Executor作為Standalone集群部署方式下的最底層員工,一旦異常退出,其后果會是什么呢?

  1. executor異常退出,ExecutorRunner注意到異常,將情況通過ExecutorStateChanged匯報給Master
  2. Master收到通知之后,非常不高興,盡然有小弟要跑路,那還了得,要求Executor所屬的worker再次啟動
  3. Worker收到LaunchExecutor指令,再次啟動executor

作為一名底層員工,想輕易摞挑子不干是不成的。"人在江湖,身不由己“啊。

測試步驟

  • 啟動Master
  • 啟動Worker
  • 啟動spark-shell
  • 手工kill掉CoarseGrainedExecutorBackend

fetchAndRunExecutor

fetchAndRunExecutor負責啟動具體的Executor,並監控其運行狀態,具體代碼邏輯如下所示

def fetchAndRunExecutor() {
    try {
      // Create the executor's working directory
      val executorDir = new File(workDir, appId + "/" + execId)
      if (!executorDir.mkdirs()) {
        throw new IOException("Failed to create directory " + executorDir)
      }

      // Launch the process
      val command = getCommandSeq
      logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))
      val builder = new ProcessBuilder(command: _*).directory(executorDir)
      val env = builder.environment()
      for ((key, value)  {
        logInfo("Runner thread for executor " + fullId + " interrupted")
        state = ExecutorState.KILLED
        killProcess(None)
      }
      case e: Exception => {
        logError("Error running executor", e)
        state = ExecutorState.FAILED
        killProcess(Some(e.toString))
      }
    }
  }

異常分析3: master 異常退出

 

worker和executor異常退出的場景都講到了,我們剩下最后一種情況了,master掛掉了怎么辦?

帶頭大哥如果不在了,會是什么后果呢?

  • worker沒有匯報的對象了,也就是如果executor再次跑飛,worker是不會將executor啟動起來的,大哥沒給指令
  • 無法向集群提交新的任務
  • 老的任務即便結束了,占用的資源也無法清除,因為資源清除的指令是Master發出的

怎么樣,知道后果很嚴重了吧?別看老大平時不干活,要真的不在,僅憑小弟們是不行的。

Master單點失效問題的解決

那么怎么解決Master單點失效的問題呢?

你說再加一個Master就是了,兩個老大。兩個老大如果同時具有指揮權,結果也將是災難性的。設立一個副職人員,當目前的正職掛掉之后,副職接管。也就是同一時刻,有且只有一個active master。

注意不錯,如何實現呢?使用zookeeper的ElectLeader功能,效果圖如下

配置細節

如何搭建zookeeper集群,這里不再廢話,哪天有空的話再整一整,或者可以參考寫的storm系列中談到的zookeeper的集群安裝步驟。

假設zookeeper集群已經設置成功,那么如何啟動standalone集群中的節點呢?有哪些特別的地方?

conf/spark-env.sh

 在conf/spark-env.sh中,為SPARK_DAEMON_JAVA_OPTS添加如下選項

System property Meaning
spark.deploy.recoveryMode Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE).
spark.deploy.zookeeper.url The ZooKeeper cluster url (e.g., 192.168.1.100:2181,192.168.1.101:2181).
spark.deploy.zookeeper.dir The directory in ZooKeeper to store recovery state (default: /spark).

設置SPARK_DAEMON_JAVA_OPTS的實際例子

SPARK_DAEMON_JAVA_OPTS="$SPARK_DAEMON_JAVA_OPTS -Dspark.deploy.recoveryMode=ZOOKEEPER"

 應用程序啟動

應用程序運行的時候,指定多個master地址,用逗號分開,如下所示

MASTER=spark://192.168.100.101:7077,spark://192.168.100.102:7077 bin/spark-shell

小結

Standalone集群部署方式下的容錯性分析讓我們對於Spark的任務分發過程又有了進一處的認識。前面的篇章從整體上匆匆過了一遍Spark所涉及的知識點,分析的不夠深,不夠細。

此篇嘗試着就某一具體問題做深入的分析。套用書畫中的說法,在框架分析的時候,我們可以大開大合,疏可走馬,計白當黑,在細節分析的時候,又要做到“密不透風,條分縷析,層層遞進”。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM