Spark任務提交源碼分析


用戶端執行

以下是一個以spark on yarn Cluster模式提交命令,本系列文章所有分析都是基於spark on yarn Cluster模式,spark版本:2.4.0

spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode cluster \
--driver-memory 512M \
--executor-memory 512M \
--num-executors 1 \
/opt/cloudera/parcels/CDH/lib/spark/examples/jars/spark-examples_2.11-2.4.0-cdh6.3.2.jar

spark-submit是一個shell腳本,其內容如下:

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

spark-submit提交的參數最終都會通過exec命令調用org.apache.spark.deploy.SparkSubmit傳入。

SparkSubmit類

main方法

SparkSubmit的main方法在其伴生類中,源碼簡略版如下:

  override def main(args: Array[String]): Unit = {
    val submit = new SparkSubmit() {
      self =>
      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
        ...
      }
      override protected def logInfo(msg: => String): Unit = printMessage(msg)
      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
      override def doSubmit(args: Array[String]): Unit = {
        ...
        super.doSubmit(args)
        ...
      }
    }
    submit.doSubmit(args)
  }

可以看到,在main方法中,通過調用SparkSubmit類的doSubmit方法實現任務提交的,doSubmit方法如下:

  def doSubmit(args: Array[String]): Unit = {
    ...
    val appArgs = parseArguments(args)
    ...
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }

在doSubmit方法中,解析spark-submit命令提交的參數后,通過模式匹配實現不同命令走不同方法的,而我們上面的命令是submit,所以到這里執行的是submit方法。

submit方法

submit源碼簡略如下:

  private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    // 參數解析,拿到執行的childMainClass值
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
                // 執行runMain方法
              runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
            }
          })
        } catch {
          ...
        }
      } else {
        // 執行runMain方法
        runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
      }
    }
    if (args.isStandaloneCluster && args.useRest) {
      try {
        doRunMain()
      } catch {
        ...
      }
    } else {
      doRunMain()
    }
  }

可以看到submit方法中,主要是解析用戶提交的參數,然后執行doRunMain,通過doRunMain方法執行runMain方法,
這樣做的原因是:在執行runMain方法前做了一次判斷,判斷是不是StandaloneCluster模式,
如果是StandaloneCluster模式,在任務提交時,有兩種提交方式,一種是用org.apache.spark.deploy.Client包裝后通過傳統的rpc方式提交,
另一種是spark1.3以后引入的rest方式提交,而rest方式提交是spark1.3以后StandaloneCluster模式的默認提交方式,
而如果master不支持rest模式則會報錯,在這里做了一個判斷,在報錯后會通過傳統rpc的方式去調用。

參數解析prepareSubmitEnvironment方法中,有一段重要的代碼如下:

...
private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.yarn.YarnClusterApplication"
...
private[deploy] def prepareSubmitEnvironment(...)
      : (Seq[String], Seq[String], SparkConf, String) = {
        if (isYarnCluster) {
            childMainClass = YARN_CLUSTER_SUBMIT_CLASS
            ...
        }
        ...
        (childArgs, childClasspath, sparkConf, childMainClass)
      }

這段代碼的作用是判斷當前模式是不是YarnCluster模式,是YarnCluster模式的話,則將“org.apache.spark.deploy.yarn.YarnClusterApplication”賦值給變量“childMainClass”;

而在prepareSubmitEnvironment方法外,submit中可以看到,childMainClass變量的值通過模式匹配拿到后傳給了runMain方法;

runMain方法

runMain方法源碼如下:

  private def runMain(...): Unit = {
    ...
    var mainClass: Class[_] = null
    try {
      // 通過反射拿到YarnClusterApplication類
      mainClass = Utils.classForName(childMainClass)
    } catch {
      ...
    }

    // 判斷拿到的mainClass類是不是SparkApplication的子類,是SparkApplication的子類就實例化mainClass,不是的話則通過JavaMainApplication包裝一下mainClass
    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      mainClass.newInstance().asInstanceOf[SparkApplication]
    } else {
      ...
      new JavaMainApplication(mainClass)
    }
      ...
    try {
      app.start(childArgs.toArray, sparkConf)
    } catch {
      ...
    }
  }

runMain方法中,主要是通過反射去拿到參數childMainClass的class,而childMainClass正是我們前面的“org.apache.spark.deploy.yarn.YarnClusterApplication”類,而YarnClusterApplication是實現了SparkApplication特質的;
最后拿到了YarnClusterApplication的實例后調用YarnClusterApplication的start方法。

YarnClusterApplication類

start方法

YarnClusterApplication的start方法很簡單,源碼如下:

  override def start(args: Array[String], conf: SparkConf): Unit = {
    new Client(new ClientArguments(args), conf).run()
  }

Client類

Client是spark任務提交在提交用戶的計算機上跑的最后一個類,Client完成了與hadoop yarn通信並提交application master的任務。

在Client類的構造函數中,實例化了yarn的client,以及獲取到了amMemory等諸多的參數;

Client的構造函數中重要參數初始化如下:

  private val yarnClient = YarnClient.createYarnClient
  private val hadoopConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf))

  private val isClusterMode = sparkConf.get("spark.submit.deployMode", "client") == "cluster"

  private val amMemory = if (isClusterMode) {
    sparkConf.get(DRIVER_MEMORY).toInt
  } else {
    sparkConf.get(AM_MEMORY).toInt
  }
  private val amMemoryOverhead = {
    val amMemoryOverheadEntry = if (isClusterMode) DRIVER_MEMORY_OVERHEAD else AM_MEMORY_OVERHEAD
    sparkConf.get(amMemoryOverheadEntry).getOrElse(
      math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
  }
  private val amCores = if (isClusterMode) {
    sparkConf.get(DRIVER_CORES)
  } else {
    sparkConf.get(AM_CORES)
  }

  private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
  private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
    math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt

run方法

run方法源碼如下:

  def run(): Unit = {
    this.appId = submitApplication()
    ...
  }

run方法主要是通過調用Client類的submitApplication方法獲取到應用的appId,而主要的提交方法都在submitApplication方法中;

submitApplication方法

submitApplication方法的源碼如下:

  def submitApplication(): ApplicationId = {
    var appId: ApplicationId = null
    try {  
      launcherBackend.connect()
      // yarn client啟動
      yarnClient.init(hadoopConf)
      yarnClient.start()
      // 向yarn申請唯一的app id
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      appId = newAppResponse.getApplicationId()
      ...
      // yarn提交的容器命令准備
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)
      ...
      // 容器提交
      yarnClient.submitApplication(appContext)
      launcherBackend.setAppId(appId.toString)
      reportLauncherState(SparkAppHandle.State.SUBMITTED)

      appId
    } catch {
      ...
  }

createContainerLaunchContext方法

createContainerLaunchContext的主要源碼如下:

  private def createContainerLaunchContext(...) : ContainerLaunchContext = {
    val amClass =
      if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }
    ...
    val amArgs =
      Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs ++
      Seq("--properties-file", buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, SPARK_CONF_FILE))

    val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++ amArgs ++
      Seq(
        "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
        "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")

    val printableCommands = commands.map(s => if (s == null) "null" else s).toList
    amContainer.setCommands(printableCommands.asJava)

    val securityManager = new SecurityManager(sparkConf)
    amContainer.setApplicationACLs(
      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager).asJava)
    setupSecurityToken(amContainer)
    amContainer
  }

從上面的源碼可以看到,createContainerLaunchContext方法主要是把“/bin/java -server + javaOpts + amArgs”組成commands包裝成amContainer提交給了yarn;

而amArgs參數通過上面源碼可以發現在ClusterMode下是:org.apache.spark.deploy.yarn.ApplicationMaster,在ClientMode下是:org.apache.spark.deploy.yarn.ExecutorLauncher

本文僅討論ClusterMode模式,至此,ClusterMode模式下所有在提交任務的用戶的計算機上運行的代碼全部以及跑完,下面的所有的代碼全部運行在yarn的容器中。

方法執行時序圖

用戶端執行

Driver

ApplicationMaster類

main方法

  def main(args: Array[String]): Unit = {
    SignalUtils.registerLogger(log)
    val amArgs = new ApplicationMasterArguments(args)
    master = new ApplicationMaster(amArgs)
    System.exit(master.run())
  }

run方法

  final def run(): Int = {
    doAsUser {
      runImpl()
    }
    exitCode
  }

runImpl方法

runImpl方法關鍵代碼如下:

  private def runImpl(): Unit = {
    try {
      val appAttemptId = client.getAttemptId()
      var attemptID: Option[String] = None
      ...
      if (isClusterMode) {
        runDriver()
      } else {
        runExecutorLauncher()
      }
    } catch {
      ...
    } finally {
      ...
    }
  }

ApplicationMaster類中,層層調用,main中實例化一個ApplicationMaster對象,然后在調用ApplicationMaster的run方法,run方法通過runImpl實現;

在runImpl方法中,判斷了一下是不是ClusterMode,是ClusterMode的話調用runDriver,Client模式的話調用runExecutorLauncher;

本文討論ClusterMode模式。

runDriver方法

  private def runDriver(): Unit = {
    // 執行用戶提交的main方法  
    userClassThread = startUserApplication()
    ...
    try {
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        rpcEnv = sc.env.rpcEnv
        ...
        val driverRef = rpcEnv.setupEndpointRef(
          RpcAddress(host, port),
          YarnSchedulerBackend.ENDPOINT_NAME)
        // 獲取容器,並運行Executor
        createAllocator(driverRef, userConf)
      } else {
          ...
      }
      resumeDriver()
      userClassThread.join()
    } catch {
      ...
    } finally {
      resumeDriver()
    }
  }

在runDriver方法中,通過調用startUserApplication方法來創建一個線程執行用戶程序的main方法,同時會通過createAllocator方法向yarn申請資源並運行Executor

startUserApplication方法

  private def startUserApplication(): Thread = {
    
    var userArgs = args.userArgs
    
    val mainMethod = userClassLoader.loadClass(args.userClass)
      .getMethod("main", classOf[Array[String]])

    val userThread = new Thread {
      override def run() {
        try {
          if (!Modifier.isStatic(mainMethod.getModifiers)) {
            logError(s"Could not find static main method in object ${args.userClass}")
            finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_EXCEPTION_USER_CLASS)
          } else {
            mainMethod.invoke(null, userArgs.toArray)
            finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
            logDebug("Done running user class")
          }
        } catch {
          ...
        } finally {
          ...
        }
      }
    }
    userThread.setContextClassLoader(userClassLoader)
    userThread.setName("Driver")
    userThread.start()
    userThread
  }

在startUserApplication方法中,首先會去解析參數,拿到我們在提交命令中的“--class”指定的類,然后判斷該類是否有參數為“Array[String]”的靜態“main”方法,即Scala/Java程序的入口函數,隨后會在一個名為“Driver”的線程中通過反射執行用戶程序的main方法,至此,我們通過源碼分析的方式知道了我們常說的“Driver”實際上是一個用來執行用戶程序名為“Driver”的線程。

執行用戶程序的main方法時會初始化SparkContext,初始化SparkContext時在其構造函數中,會創建TaskScheduler,然后調用TaskScheduler的postStartHook()方法將SparkContext自身又返回給到ApplicationMaster中,方便后續的調用,源碼如下:

// SparkContext init
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts

......

_taskScheduler.postStartHook()

// YarnClusterScheduler
override def postStartHook() {
    ApplicationMaster.sparkContextInitialized(sc)
    super.postStartHook()
    logInfo("YarnClusterScheduler.postStartHook done")
}

// ApplicationMaster
private[spark] def sparkContextInitialized(sc: SparkContext): Unit = {
    master.sparkContextInitialized(sc)
}
private def sparkContextInitialized(sc: SparkContext) = {
    sparkContextPromise.synchronized {
        // Notify runDriver function that SparkContext is available
        sparkContextPromise.success(sc)
        // Pause the user class thread in order to make proper initialization in runDriver function.
        sparkContextPromise.wait()
    }
}

createAllocator方法

createAllocator方法是Driver用於向yarn通信申請資源並在申請的容器中運行Executor,接下來我們看看他是如何實現的,由於createAllocator方法中的調用棧太深,所以精簡下createAllocator方法的調用,createAllocator方法的簡略源碼如下:

  private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
    val appId = client.getAttemptId().getApplicationId().toString()
    val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    // 獲取YarnAllocator,通過YarnAllocator與yarn通訊
    allocator = client.createAllocator(
      yarnConf,
      _sparkConf,
      driverUrl,
      driverRef,
      securityMgr,
      localResources)
    credentialRenewer.foreach(_.setDriverRef(driverRef))
    rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
    // 資源申請
    allocator.allocateResources()
    ...
  }

  def allocateResources(): Unit = synchronized {
    updateResourceRequests()

    val progressIndicator = 0.1f
    val allocateResponse = amClient.allocate(progressIndicator)

    val allocatedContainers = allocateResponse.getAllocatedContainers()
    allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)

    if (allocatedContainers.size > 0) {
      ...
      handleAllocatedContainers(allocatedContainers.asScala)
    }
    ...
  }

  def handleAllocatedContainers(allocatedContainers: Seq[Container]): Unit = {
    // 省略一系列容器校驗
    ...
    runAllocatedContainers(containersToUse)
  }

  private def runAllocatedContainers(containersToUse: ArrayBuffer[Container]): Unit = {
    for (container <- containersToUse) {
      ...
      if (runningExecutors.size() < targetNumExecutors) {
        numExecutorsStarting.incrementAndGet()
        if (launchContainers) {
          // 通過線程執行提交Executor的操作
          launcherPool.execute(new Runnable {
            override def run(): Unit = {
              try {
                new ExecutorRunnable(
                  Some(container),
                  conf,
                  sparkConf,
                  driverUrl,
                  executorId,
                  executorHostname,
                  executorMemory,
                  executorCores,
                  appAttemptId.getApplicationId.toString,
                  securityMgr,
                  localResources
                ).run()   // 實際提交Executor的方法
                updateInternalState()
              } catch {
                ...
              }
            }
          })
        } else {
          updateInternalState()
        }
      } else {
        ...
      }
    }
  }

從上面一系列方法可以看到,spark將Executor任務提交分裝成了4個步驟,分別是獲取yarnClient、與yarn通訊拿到可申請的容器數、容器校驗、容器提交;

而運行多少Executor是如何確定的呢?在YarnAllocator類中有個targetNumExecutors值,在初始化YarnAllocator類時就確定了要創建多少個Executor,在runAllocatedContainers方法時會確定running的Executor是否小於targetNumExecutors數,小於的話就會去創建Executor直到等於targetNumExecutors數,targetNumExecutors值確定源碼如下:

// YarnAllocator
  @volatile private var targetNumExecutors =
    SchedulerBackendUtils.getInitialTargetExecutorNumber(sparkConf)

// SchedulerBackendUtils
  val DEFAULT_NUMBER_EXECUTORS = 2
  def getInitialTargetExecutorNumber(
      conf: SparkConf,
      numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = {
    if (Utils.isDynamicAllocationEnabled(conf)) {
      val minNumExecutors = conf.get(DYN_ALLOCATION_MIN_EXECUTORS)
      val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
      val maxNumExecutors = conf.get(DYN_ALLOCATION_MAX_EXECUTORS)
      require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors,
        s"initial executor number $initialNumExecutors must between min executor number " +
          s"$minNumExecutors and max executor number $maxNumExecutors")

      initialNumExecutors
    } else {
      conf.get(EXECUTOR_INSTANCES).getOrElse(numExecutors)
    }
  }

ExecutorRunnable類

run方法

  def run(): Unit = {
    ...
    startContainer()
  }  
  def startContainer(): java.util.Map[String, ByteBuffer] = {
    ...
    val commands = prepareCommand()
    ctx.setCommands(commands.asJava)
    ...
    try {
      nmClient.startContainer(container.get, ctx)
    } catch {
      ...
    }
  }

在ExecutorRunnable的run方法中我們又看到了熟悉命令准備操作,實際上向yarn提交任務也都是向yarn提交一系列命令實現的;

prepareCommand方法

  private def prepareCommand(): List[String] = {
    ...
    val commands = prefixEnv ++
      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
      javaOpts ++
      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
        "--driver-url", masterAddress,
        "--executor-id", executorId,
        "--hostname", hostname,
        "--cores", executorCores.toString,
        "--app-id", appId) ++
      userClassPath ++
      Seq(
        s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
        s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")

    commands.map(s => if (s == null) "null" else s).toList
  }

可以看到,Executor任務實際執行的是“org.apache.spark.executor.CoarseGrainedExecutorBackend”這個類;

至此任務提交所有的Driver的工作已經完成。

方法執行時序圖

Driver初始化

Executor

在上面的Driver分析中,我們發現Driver創建Executor任務實際上是向yarn提交了一個執行“org.apache.spark.executor.CoarseGrainedExecutorBackend”類的命令,

CoarseGrainedExecutorBackend類

Executor啟動的一系列方法

  def main(args: Array[String]) {
    ...
    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
    System.exit(0)
  }

  private def run(...) {
      ...
      val env = SparkEnv.createExecutorEnv(
        driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false)

      env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
      workerUrl.foreach { url =>
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      env.rpcEnv.awaitTermination()
    }
  }

在Executor啟動的一系列方法中,我們會發現實際上我們常說的Executor指的是一個名為“Executor”的Endpoint,Endpoint是屬於spark通訊中的actor模型中的概念,我們將在下一篇文章中專門講這個,此時我們只需要知道,在注冊完這個Endpoint后,被注冊的Endpoint的onStart方法會被調用;

CoarseGrainedExecutorBackend類

onStart方法

  override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
        // Always receive `true`. Just ignore it
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

在onStart方法中,CoarseGrainedExecutorBackend向CoarseGrainedSchedulerBackend發送了一條名為“RegisterExecutor”的消息用於注冊,CoarseGrainedSchedulerBackend收到消息后,回復一條“RegisteredExecutor”消息,CoarseGrainedExecutorBackend接收到后開始初始化Executor;

方法執行時序圖

Executor初始化


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM