Spark源碼分析 之 Driver和Excutor是怎么跑起來的?(2.2.0版本)


今天抽空回顧了一下Spark相關的源碼,本來想要了解一下Block的管理機制,但是看着看着就回到了SparkContext的創建與使用。正好之前沒有正式的整理過這部分的內容,這次就順帶着回顧一下。

更多內容參考:我的大數據之路

Spark作為目前最流行的大數據計算框架,已經發展了幾個年頭了。版本也從我剛接觸的1.6升級到了2.2.1。由於目前工作使用的是2.2.0,所以這次的分析也就從2.2.0版本入手了。

涉及的內容主要有:

  • Standalone模式中的Master與Worker
  • client、driver、excutor的關系

下面就按照順序依次介紹一下。

Master與Worker

在最開始編程的時候,很少會涉及分布式,因為數據量也不大。后來隨着硬件的發展cpu的瓶頸,開始流行多線程編程,基於多線程來加快處理速度;再后來,衍生出了網格計算、CPU與GPU的異構並行計算以及當時流行的mapreduce分布式計算。但是mapreduce由於存儲以及計算流程的限制,spark開始流行起來。Spark憑借內存計算、強大的DAG回溯能力,快速的占領並行計算的風口。

那么並行計算肯定是需要分布式集群的,常見的集群管理方式,有Master-Slave模式、P2P模式等等。

比如Mysql的主從復制,就是Master-Slave模式;Elasticsearch的分片管理就是P2P模式。在Spark中有不同的部署方式,但是計算的模式都是Master-Slave模式,只不過Slave換了名字叫做worker而已。集群的部署模式如下所示:

流程就是用戶以client的身份向master提交任務,master去worker上面創建執行任務的載體(driver和excutor)。

client、driver、excutor的關系

Master和Worker是服務器的部署角色,程序從執行上,則分成了client、driver、excutor三種角色。按照模式的不同,client和driver可能是同一個。以2.2.0版本的standalone模式來說,他們三個是獨立的角色。client用於提交程序,初始化一些環境變量;driver用於生成task並追蹤管理task的運行;excutor負責最終task的執行。

源碼探索

總的流程可以總結為下面的一張圖:

通過查看源碼,來看一下

1 SparkContext創建調度器

在創建SparkContext的時候會創建幾個核心的模塊:

  1. DAGScheduler 面向job的調度器
  2. TaskScheduler 不同的集群模式,有不同的實現方式,如standalone下的taskschedulerImpl
  3. SchedulerBackend 不同的集群模式下,有不同的實現方式,如standalone下的StandaloneSchedulerBackend.負責向master發起注冊
// 創建並啟動調度器
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
...
// 啟動調度器
_taskScheduler.start()

在createTaskSchduler中,根據master的不同,選擇不同的實現方式,主要是在backend的實現上有差異:

master match {
      case "local" =>
        ...

      case LOCAL_N_REGEX(threads) =>
        ...

      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        ...

      case SPARK_REGEX(sparkUrl) =>
        // 創建調度器
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        // 創建backend
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        // 把backend注入到schduler中
        scheduler.initialize(backend)
        (backend, scheduler)

      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        ...

      case masterUrl =>
        ...
    }

我們這里只看一下standalone模式的創建,就是創建了TaskSchedulerImpl和StandaloneSchedulerBackend的對象,另外初始化了調度器,根據配置選擇調度模式,默認是FIFO:

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
          s"$schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }

2 TaskSchedulerImpl執行start方法

其實是執行了backend的start()方法

override def start() {
    backend.start()
    ...
  }

3 StandaloneSchedulerBackend執行start方法

這部分代碼比較多,可以簡化的看:

  • 封裝command對象
  • 封裝appDesc對象
  • 創建StandaloneAppClient對象
  • 執行start()方法

其中command中包含的那個類,就是excutor的實現類。

override def start() {
    //初始化參數
    ...
    
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
   ...
    val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
    // 注意前面創建了一大堆的配置對象,主要就是那個class等信息
	client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
    client.start()
    ...
  }

4 發起注冊

核心的代碼在StanaloneAppClient中,並在start()方法中啟動了一個rpc的服務——ClientEndpoint

override def onStart(): Unit = {
  try {
    registerWithMaster(1)//發起注冊
  } catch {
    ...
  }
}

registerWithMaster采用了異步發送請求連接master,只要有一個注冊成功,其他的都會cancel。這里有時間可以做個小hello world玩玩看。

private def registerWithMaster(nthRetry: Int) {
  registerMasterFutures.set(tryRegisterAllMasters())
  registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
    override def run(): Unit = {
      if (registered.get) {
        registerMasterFutures.get.foreach(_.cancel(true))
        registerMasterThreadPool.shutdownNow()
      } else if (nthRetry >= REGISTRATION_RETRIES) {
        markDead("All masters are unresponsive! Giving up.")
      } else {
        registerMasterFutures.get.foreach(_.cancel(true))
        registerWithMaster(nthRetry + 1)
      }
    }
  }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
}
//發起注冊
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  ...
  masterRef.send(RegisterApplication(appDescription, self))
  ...
}

5 Master接收到請求執行schedule方法

Master是一個常駐的進程,時刻監聽別人發過來的消息。剛才client發送了一個RegisterApplication消息,忽略前面創建app的內容,直接執行了schedule方法:

case RegisterApplication(description, driver) =>
   // TODO Prevent repeated registrations from some driver
   if (state == RecoveryState.STANDBY) {
     // ignore, don't send response
   } else {
     ...
     schedule()
   }

6 Master發送launchDriver

發送lanunchDriver請求

private def schedule(): Unit = {
  ...
  for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
    ...
    while (numWorkersVisited < numWorkersAlive && !launched) {
      ...
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
        launchDriver(worker, driver)
        ...
      }
      ...
    }
  }
  startExecutorsOnWorkers()
}
//向worker發送launchDriver請求
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  ...
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
  ...
}

7 Worker創建DriverRunner

case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
      drivers(driverId) = driver
      driver.start()

      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem

8 Master發送launchExcutor

第6步中最后有一個startExecutorsOnWorkers方法。

private def startExecutorsOnWorkers(): Unit = {
...
  for (app <- waitingApps if app.coresLeft > 0) {
    ...
    for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
      allocateWorkerResourceToExecutors(
        app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
    }
  }
}

private def allocateWorkerResourceToExecutors(
   app: ApplicationInfo,
   assignedCores: Int,
   coresPerExecutor: Option[Int],
   worker: WorkerInfo): Unit = {
 ...
 for (i <- 1 to numExecutors) {
   ...
   launchExecutor(worker, exec)
   ...
 }
}

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  ...
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  ...
}

9 Worker創建ExcutorRunner

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
if (masterUrl != activeMasterUrl) {
  ...
} else {
  try {
    ...
    val manager = new ExecutorRunner(
      appId,
      execId,
      appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
      cores_,
      memory_,
      self,
      workerId,
      host,
      webUi.boundPort,
      publicAddress,
      sparkHome,
      executorDir,
      workerUri,
      conf,
      appLocalDirs, ExecutorState.RUNNING)
    ...
  } catch {
...
  }
}

至此,Driver和Excutor就啟動起來了.....

之后代碼是怎么運行的,就且聽下回分解把!

參考

  1. SparkContext http://www.cnblogs.com/jcchoiling/p/6427406.html
  2. spark worker解密:http://www.cnblogs.com/jcchoiling/p/6433196.html
  3. 2.2.0源碼
  4. 《Spark內核機制及性能調優》· 王家林


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM