Spark學習筆記(2)---Spark消息通信源碼分析


Spark消息通信

Spark啟動消息通信

Spark啟動過程中主要是進行Master和Worker之間的通信,其消息發送關系如下,首先由worker節點向Master發送注冊消息,然后Master處理完畢后,返回注冊成功消息或失敗消息。

其詳細過程如下:
(1) 當Master啟動后,隨之啟動各Worker,Worker啟動時會創建通信環境RpcEnv和終端點EndPoint,並向Master發送注冊Worker的消息RegisterWorker.Worker.tryRegisterAllMasters方法如下:
``` scala // 因為Master可能不止一個 private def tryRegisterAllMasters(): Array[JFuture[_]] = { masterRpcAddresses.map { masterAddress => registerMasterThreadPool.submit(new Runnable { override def run(): Unit = { try { logInfo("Connecting to master " + masterAddress + "...") // 獲取Master終端點的引用 val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) registerWithMaster(masterEndpoint) } catch {} ... }

private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
// 根據Master節點的引用發送注冊信息
masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
workerId, host, port, self, cores, memory, workerWebUiUrl))
.onComplete {
// 返回注冊成功或失敗的結果
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
Utils.tryLogNonFatalError {handleRegisterResponse(msg)}
case Failure(e) =>
logError(s"Cannot register with master: ${masterEndpoint.address}", e)
System.exit(1)
}(ThreadUtils.sameThread)
}

(2) Master收到消息后,需要對Worker發送的信息進行驗證、記錄。如果注冊成功,則發送RegisteredWorker消息給對應的Worker,告訴Worker已經完成注冊,
隨之進行步驟3,即Worker定期發送心跳給Master;如果注冊過程中失敗,則會發送RegisterWorkerFailed消息,Woker打印出錯日志並結束Worker啟動。Master.receiverAndReply方法如下:</br>
``` scala
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  case RegisterWorker(
      id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl) =>
    logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
      workerHost, workerPort, cores, Utils.megabytesToString(memory)))
    // Master處於STANDBY狀態
    if (state == RecoveryState.STANDBY) {
      context.reply(MasterInStandby)
    } else if (idToWorker.contains(id)) { // 在注冊列表中發現了該Worker節點
      context.reply(RegisterWorkerFailed("Duplicate worker ID"))
    } else {
      val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
        workerRef, workerWebUiUrl)
      // registerWorker方法會把Worker放到注冊列表中
      if (registerWorker(worker)) {
        persistenceEngine.addWorker(worker)
        context.reply(RegisteredWorker(self, masterWebUiUrl))
        schedule()
      } else {
        val workerAddress = worker.endpoint.address
        logWarning("Worker registration failed. Attempted to re-register worker at same " +
          "address: " + workerAddress)
        context.reply(RegisterWorkerFailed("Attempted to re-register worker at same address: "
          + workerAddress))
      }
    }
   
   ...
}

(3) 當Worker接收到注冊成功后,會定時發送心跳信息Heartbeat給Master,以便Master了解Worker的實時狀態。間隔時間可以在spark.worker.timeout中設置,注意,該設置值的1/4為心跳間隔。

Spark運行時消息通信

用戶提交應用程序時,應用程序的SparkContext會向Master發送注冊應用信息,並由Master給該應用分配Executor,Executor啟動后會向SparkContext發送注冊成功消息;當SparkContext的RDD觸發行動操作后,通過DAGScheduler進行划分stage,並將stage
轉化為TaskSet,接着由TaskScheduler向注冊的Executor發送執行消息,Executor接收到任務消息后啟動並運行;最后當所有任務運行時,由Driver處理結果並回收資源。如下圖所示:
Spark啟動過程中主要是進行Master和Worker之間的通信,其消息發送關系如下,首先由worker節點向Master發送注冊消息,然后Master處理完畢后,返回注冊成功消息或失敗消息。

其詳細過程如下:
(1) 在SparkContext創建過程中會先實例化SchedulerBackend對象,standalone模式中實際創建的是StandaloneSchedulerBackend對象,在該對象啟動過程中會繼承父類DriverEndpoint和創建StandaloneAppClient的ClientEndpoint兩個終端點。 在ClientEndpoint的tryRegisterAllMasters方法中創建注冊線程池registerMasterThreadPool, 在該線程池中啟動注冊線程並向Master發送RegisterApplication注冊應用的消息,代碼如下: ``` scala private def tryRegisterAllMasters(): Array[JFuture[_]] = { // 遍歷所有的Master, 這是一個for推導式,會構造會一個集合 for (masterAddress <- masterRpcAddresses) yield { // 在線程池中啟動注冊線程,當該線程讀到應用注冊成功標識registered==true時退出注冊線程 registerMasterThreadPool.submit(new Runnable { override def run(): Unit = try { if (registered.get) { // private val registered = new AtomicBoolean(false) 原子類型 return } logInfo("Connecting to master " + masterAddress.toSparkURL + "...") val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME) // 發送注冊消息 masterRef.send(RegisterApplication(appDescription, self)) } catch {...} }) } } ``` 當Master接收到注冊應用消息時,在registerApplication方法中記錄應用信息並把該應用加入到等待運行列表中,發送注冊成功消息 RegisteredApplication給ClientEndpoint,同時調用startExecutorsOnWorkers方法運行應用。Master.startExecutorsOnWorkers方法代碼如下: ``` scala case RegisterApplication(description, driver) => // TODO Prevent repeated registrations from some driver if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) val app = createApplication(description, driver) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) // 使用持久化引擎,將Application進行持久化 persistenceEngine.addApplication(app) driver.send(RegisteredApplication(app.id, self)) schedule() }

private def schedule(): Unit = {
if (state != RecoveryState.ALIVE) {
return
}
// 對Worker節點進行隨機排序
val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
val numWorkersAlive = shuffledAliveWorkers.size
var curPos = 0
// 按照順序在集群中啟動Driver,Driver盡量在不同的Worker節點上運行
for (driver <- waitingDrivers.toList) {
var launched = false
var numWorkersVisited = 0
while (numWorkersVisited < numWorkersAlive && !launched) {
val worker = shuffledAliveWorkers(curPos)
numWorkersVisited += 1
if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
launchDriver(worker, driver)
waitingDrivers -= driver
launched = true
}
curPos = (curPos + 1) % numWorkersAlive
}
}
startExecutorsOnWorkers()
}

private def startExecutorsOnWorkers(): Unit = {
// 使用FIFO算法運行應用,即先注冊的應用先運行
for (app <- waitingApps if app.coresLeft > 0) {
val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
// Filter out workers that don't have enough resources to launch an executor
val usableWorkers = workers.toArray.filter(.state == WorkerState.ALIVE)
.filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
worker.coresFree >= coresPerExecutor.getOrElse(1))
.sortBy(
.coresFree).reverse
// 一種是spreadOutApps,就是把應用運行在盡量多的Worker上,另一種是非spreadOutApps
val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

  // Now that we've decided how many cores to allocate on each worker, let's allocate them
  // 給每個worker分配完application要求的cpu core之后,遍歷worker啟動executor
  for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
    allocateWorkerResourceToExecutors(
      app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
  }
}

}

</br>
(2) StandaloneAppClient.ClientEndpoint接收到Master發送的RegisteredApplication消息,需要把注冊標識registered置為true。代碼如下:
``` scala
case RegisteredApplication(appId_, masterRef) =>
    appId.set(appId_)
    registered.set(true)
    master = Some(masterRef)
    listener.connected(appId.get)

(3) 在Master類的starExecutorsOnWorkers方法中分配資源運行應用程序時,調用allocateWorkerResourceToExecutors方法實現在Worker中啟動Executor。當
Worker收到Master發送過來的LaunchExecutor消息,先實例化ExecutorRunner對象,在ExecutorRunner啟動中會創建進程生成器ProcessBuilder, 然后由該生成器使用command
創建CoarseGrainedExecutorBackend對象,該對象是Executor運行的容器,最后Worker發送ExecutorStateChanged消息給Master,通知Executor容器已經創建完畢。

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
  if (masterUrl != activeMasterUrl) {
    logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
  } else {
    try {
      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

      // 創建executor執行目錄
      val executorDir = new File(workDir, appId + "/" + execId)
      if (!executorDir.mkdirs()) {
        throw new IOException("Failed to create directory " + executorDir)
      }

      // 創建executor本地目錄,當應用程序結束后由worker刪除
      val appLocalDirs = appDirectories.getOrElse(appId,
        Utils.getOrCreateLocalRootDirs(conf).map { dir =>
          val appDir = Utils.createDirectory(dir, namePrefix = "executor")
          Utils.chmod700(appDir)
          appDir.getAbsolutePath()
        }.toSeq)
      appDirectories(appId) = appLocalDirs
      
      // 在ExecutorRunner中創建CoarseGrainedExecutorBackend對象,創建的是使用應用信息中的command,而command在
      // StandaloneSchedulerBackend的start方法中構建
      val manager = new ExecutorRunner(appId,execId,appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
        cores_,memory_,self,workerId,host,webUi.boundPort,publicAddress,sparkHome,executorDir,workerUri,conf,
        appLocalDirs, ExecutorState.RUNNING)
      executors(appId + "/" + execId) = manager
      manager.start() // 啟動ExecutorRunner
      coresUsed += cores_
      memoryUsed += memory_
      sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
    } catch {...}
  }

在ExecutorRunner創建中調用了fetchAndRunExecutor方法進行實現,在該方法中command內容在StandaloneSchedulerBackend中定義,指定構造Executor運行容器CoarseGrainedExecutorBacken,
代碼如下:

private def fetchAndRunExecutor() {
    try {
      // 通過應用程序信息和環境配置創建構造器builder
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      val command = builder.command()
      val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
      logInfo(s"Launch command: $formattedCommand")

      // 在構造器builder中添加執行目錄等信息
      builder.directory(executorDir)
      builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

      // 啟動構造器,創建CoarseGrainedExecutorBackend實例
      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        formattedCommand, "=" * 40)

      // 輸出CoarseGrainedExecutorBackend實例運行信息
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, StandardCharsets.UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

      // 等待CoarseGrainedExecutorBackend運行結束,當結束時向Worker發送退出狀態信息
      val exitCode = process.waitFor() 
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {...}
  }

(4) Master接收到Worker發送的ExecutorStateChanged消息,代碼如下:

case ExecutorStateChanged(appId, execId, state, message, exitStatus) =>
  // 找到executor對應的app,然后flatMap,通過app內部的緩存獲取executor信息
  val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
  execOption match {
    case Some(exec) =>
      // 設置executor的當前狀態
      val appInfo = idToApp(appId)
      val oldState = exec.state
      exec.state = state

      if (state == ExecutorState.RUNNING) {
        assert(oldState == ExecutorState.LAUNCHING,
          s"executor $execId state transfer from $oldState to RUNNING is illegal")
        appInfo.resetRetryCount()
      }
      // 向Driver發送ExecutorUpdated消息
      exec.application.driver.send(ExecutorUpdated(execId, state, message, exitStatus, false))
      ...

(5) 在3中的CoarseGrainedExecutorBackend啟動方法onStart中,會發送注冊Executor消息RegisterExecutor給DriverEndpoint,DriverEndpoint先判斷該Executor是否已經注冊,在makeOffers()方法
中分配運行任務資源,最后發送LaunchTask消息執行任務。

case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
    if (executorDataMap.contains(executorId)) {
      executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
      context.reply(true)
    } else {
      ...
      // 記錄executor編號以及該executor需要使用的核數
      addressToExecutorId(executorAddress) = executorId
      totalCoreCount.addAndGet(cores)
      totalRegisteredExecutors.addAndGet(1)
      val data = new ExecutorData(executorRef, executorRef.address, hostname,
        cores, cores, logUrls)
      // 創建executor編號和其具體信息的鍵值列表
      CoarseGrainedSchedulerBackend.this.synchronized {
        executorDataMap.put(executorId, data)
        if (currentExecutorIdCounter < executorId.toInt) {
          currentExecutorIdCounter = executorId.toInt
        }
        if (numPendingExecutors > 0) {
          numPendingExecutors -= 1
          logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
        }
      }
      // 回復Executor完成注冊消息並在監聽總線中加入添加executor事件
      executorRef.send(RegisteredExecutor)
      context.reply(true)
      listenerBus.post(
        SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
      // 分配運行任務資源並發送LaunchTask消息執行任務
      makeOffers()
    }

(6) CoarseGrainedExecutorBackend接收到Executor注冊成功RegisteredExecutor消息時,在CoarseGrainedExecutorBackend容器中實例化
Executor對象。啟動完畢后,會定時向Driver發送心跳信息, 等待接收從DriverEndpoint發送執行任務的消息。CoarseGrainedExecutorBackend處理注冊成功代碼如下:

// 向driver注冊成功了,返回RegisteredExecutor消息
case RegisteredExecutor =>
  logInfo("Successfully registered with driver")
  try {
    // 新建Executor, 該Executor會定時向Driver發送心跳信息,等待Driver下發任務
    executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
  } catch {...}

(7) CoarseGrainedExecutorBackend的Executor啟動后接收從DriverEndpoint發送的LaunchTask執行任務消息,任務執行是在Executor的launchTask方法實現的。在執行時會創建TaskRunner進程,由該進程進行任務處理,
處理完畢后發送StateUpdate消息返回給CoarseGrainedExecutorBackend。任務執行和獲取結果見后😊

def launchTask(context: ExecutorBackend,taskId: Long,
      attemptNumber: Int,taskName: String,serializedTask: ByteBuffer): Unit = {
    // 對於每一個task創建一個TaskRunner
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)
    // 將taskRunner放入內存緩存
    runningTasks.put(taskId, tr)
    // 將taskRunner放入線程池中,會自動排隊
    threadPool.execute(tr)
  }

(8) 在TaskRunner執行任務完成時,會向DriverEndpoint發送StatusUpdate消息,DriverEndpoint接收到消息會調用TaskSchedulerImpl的statusUpdate方法,根據任務執行不同的結果處理,處理完畢后再給該Executor分配執行任務。代碼如下:

case StatusUpdate(executorId, taskId, state, data) =>
    // 調用TaskSchedulerImpl的statusUpdate方法,根據任務執行不同的結果處理
    scheduler.statusUpdate(taskId, state, data.value)
    if (TaskState.isFinished(state)) {
      executorDataMap.get(executorId) match {
        // 任務執行成功后,回收該Executor運行該任務的CPU,再根據實際情況分配任務
        case Some(executorInfo) =>
          executorInfo.freeCores += scheduler.CPUS_PER_TASK
          makeOffers(executorId)
        case None => ...
      }
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM