一.Spark心跳概述
前面兩節中介紹了Spark RPC的基本知識,以及深入剖析了Spark RPC中一些源碼的實現流程。
具體可以看這里:
這一節我們來看看一個Spark RPC中的運用實例--Spark的心跳機制。當然這次主要還是從代碼的角度來看。

我們首先要知道Spark的心跳有什么用。心跳是分布式技術的基礎,我們知道在Spark中,是有一個Master和眾多的Worker,那么Master怎么知道每個Worker的情況呢,這就需要借助心跳機制了。心跳除了傳輸信息,另一個主要的作用就是Worker告訴Master它還活着,當心跳停止時,方便Master進行一些容錯操作,比如數據轉移備份等等。
與之前講Spark RPC一樣,我們同樣分成兩部分來分析Spark的心跳機制,分為服務端(Spark Context)和客戶端(Executor)。
二. Spark心跳服務端heartbeatReceiver解析
我們可以發現,SparkContext中有關於心跳的類以及RpcEndpoint注冊代碼。
class SparkContext(config: SparkConf) extends Logging {
......
private var _heartbeatReceiver: RpcEndpointRef = _
......
//向 RpcEnv 注冊 Endpoint。
_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
......
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
......
}
這里rpcEnv已經在上下文中創建好,通過setupEndpoint向rpcEnv注冊一個心跳的Endpoint。還記得上一節中HelloworldServer的例子嗎,在setupEndpoint方法中,會去調用Dispatcher創建這個Endpoint(這里就是HeartbeatReceiver)對應的Inbox和EndpointRef,然后在Inbox監聽是否有新消息,有新消息則處理它。注冊完會返回一個EndpointRef(注意這里有Refer,即是客戶端,用來發送消息的)。
所以這一句
_heartbeatReceiver = env.rpcEnv.setupEndpoint(HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
就已經完成了心跳服務端監聽的功能。
那么這條代碼的作用呢?
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
這里我們要看上面那句val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode),它會根據master url創建SchedulerBackend和TaskScheduler。這兩個類都是和資源調度有關的,所以需要借助心跳機制來傳送消息。其中TaskScheduler負責任務調度資源分配,SchedulerBackend負責與Master、Worker通信收集Worker上分配給該應用使用的資源情況。
這里主要是告訴HeartbeatReceiver(心跳)的監聽端,告訴它TaskScheduler這個東西已經設置好啦。HeartbeatReceiver就會回應你說好的,我知道的,並持有這個TaskScheduler。
到這里服務端heartbeatReceiver就差不多完了,我們可以發現,HeartbeatReceiver除了向RpcEnv注冊並監聽消息之外,還會去持有一些資源調度相關的類,比如TaskSchedulerIsSet。
三. Spark心跳客戶端發送心跳解析
發送心跳發送在Worker,每個Worker都會有一個Executor,所以我們可以發現在Executor中發送心跳的代碼。
private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false)
extends Logging {
......
// must be initialized before running startDriverHeartbeat()
//創建心跳的 EndpointRef
private val heartbeatReceiverRef = RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
......
startDriverHeartbeater()
......
/**
* Schedules a task to report heartbeat and partial metrics for active tasks to driver.
* 用一個 task 來報告活躍任務的信息以及發送心跳。
*/
private def startDriverHeartbeater(): Unit = {
val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
// Wait a random interval so the heartbeats don't end up in sync
val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
}
//heartbeater是一個單線程線程池,scheduleAtFixedRate 是定時執行任務用的,和 schedule 類似,只是一些策略不同。
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
}
......
}
可以看到,在Executor中會創建心跳的EndpointRef,變量名為heartbeatReceiverRef。
然后我們主要看startDriverHeartbeater()這個方法,它是關鍵。
我們可以看到最后部分代碼
val heartbeatTask = new Runnable() {
override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
}
heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
heartbeatTask是一個Runaable,即一個線程任務。scheduleAtFixedRate則是java concurrent包中用來執行定時任務的一個類,這里的意思是每隔10s跑一次heartbeatTask中的線程任務,超時時間30s。
為什么到這里還是沒看到heartbeatReceiverRef呢,說好的發送心跳呢?別急,其實在heartbeatTask線程任務中又調用了另一個方法,我們到里面去一探究竟。
private[spark] class Executor(
executorId: String,
executorHostname: String,
env: SparkEnv,
userClassPath: Seq[URL] = Nil,
isLocal: Boolean = false)
extends Logging {
......
private def reportHeartBeat(): Unit = {
// list of (task id, accumUpdates) to send back to the driver
val accumUpdates = new ArrayBuffer[(Long, Seq[AccumulatorV2[_, _]])]()
val curGCTime = computeTotalGcTime()
for (taskRunner <- runningTasks.values().asScala) {
if (taskRunner.task != null) {
taskRunner.task.metrics.mergeShuffleReadMetrics()
taskRunner.task.metrics.setJvmGCTime(curGCTime - taskRunner.startGCTime)
accumUpdates += ((taskRunner.taskId, taskRunner.task.metrics.accumulators()))
}
}
val message = Heartbeat(executorId, accumUpdates.toArray, env.blockManager.blockManagerId)
try {
//終於看到 heartbeatReceiverRef 的身影了
val response = heartbeatReceiverRef.askWithRetry[HeartbeatResponse](
message, RpcTimeout(conf, "spark.executor.heartbeatInterval", "10s"))
if (response.reregisterBlockManager) {
logInfo("Told to re-register on heartbeat")
env.blockManager.reregister()
}
heartbeatFailures = 0
} catch {
case NonFatal(e) =>
logWarning("Issue communicating with driver in heartbeater", e)
heartbeatFailures += 1
if (heartbeatFailures >= HEARTBEAT_MAX_FAILURES) {
logError(s"Exit as unable to send heartbeats to driver " +
s"more than $HEARTBEAT_MAX_FAILURES times")
System.exit(ExecutorExitCode.HEARTBEAT_FAILURE)
}
}
}
......
}
可以看到,這里heartbeatReceiverRef和我們上一節的例子,HelloworldClient類似,核心也是調用了askWithRetry()方法,這個方法是通過同步的方式發送Rpc消息。而這個方法里其他代碼其實就是獲取task的信息啊,或者是一些容錯處理。核心就是調用askWithRetry()方法來發送消息。
看到這你就明白了吧。Executor初始化便會用一個定時任務不斷發送心跳,同時當有task的時候,會獲取task的信息一並發送。這就是心跳的大概內容了。
總的來說Spark心跳的代碼也是比較雜的,不過這些也都是為了讓設計更加高耦合,低內聚,讓這些代碼更加方便得復用。不過通過層層剖析,我們還是發現其實它底層就是我們之前說到的Spark RPC框架的內容!!
OK,Spark RPC三部曲完畢。如果你能看到這里那不容易呀,給自己點個贊吧!!
推薦閱讀 :
從分治算法到 MapReduce
大數據存儲的進化史 --從 RAID 到 Hadoop Hdfs
一個故事告訴你什么才是好的程序員
