Spark On YARN啟動流程源碼分析(一)


本文主要參考:

a. https://www.cnblogs.com/yy3b2007com/p/10934090.html

0. 說明

a. 關於spark源碼會不定期的更新與補充

b. 對於spark源碼的歷史博文,也會不定期修改、增加、優化

c. spark源碼對應的spark版本為2.4.1

1. 引導

該篇主要講解執行spark-submit.sh腳本時將任務提交給Yarn階段代碼分析。其中spark的代碼版本為2.4.1.

(1) spark-submit的入口函數

一般提交一個spark作業的方式采用spark-submit來提交

# Run on a Spark standalone cluster
./bin/spark-submit \
  --class org.apache.spark.examples.SparkPi \
  --master spark://207.184.161.138:7077 \
  --executor-memory 20G \
  --total-executor-cores 100 \
  /path/to/examples.jar \
  1000
View Code

這個是提交到standalone集群的方式,其中腳本spark-submit從spark2.4的安裝bin目錄下找到

#!/usr/bin/env bash

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
View Code

從如上腳本內容上來看,可以發現:

a. spark-submit提交任務時,實際上最終是調用了SparkSubmit類。

b. 調用bin目錄下的spark-class腳本,實際上執行的是java進程命令。

 

從SparkSubmit的伴生類上可以看到入口main函數:

object SparkSubmit extends CommandLineUtils with Logging {

  // Cluster managers -------- Spark集群管理的模式
  private val YARN = 1
  private val STANDALONE = 2
  private val MESOS = 4
  private val LOCAL = 8
  private val KUBERNETES = 16
  private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES

  // Deploy modes  ---------- 部署模式
  private val CLIENT = 1
  private val CLUSTER = 2
  private val ALL_DEPLOY_MODES = CLIENT | CLUSTER

  // Special primary resource names that represent shells rather than application jars.
  private val SPARK_SHELL = "spark-shell"
  private val PYSPARK_SHELL = "pyspark-shell"
  private val SPARKR_SHELL = "sparkr-shell"
  private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip"
  private val R_PACKAGE_ARCHIVE = "rpkg.zip"

  private val CLASS_NOT_FOUND_EXIT_STATUS = 101

  // Following constants are visible for testing.
  private[deploy] val YARN_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.yarn.YarnClusterApplication"  // Yarn集群的提交類
  private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() // 集群模式提交的其他類名
  private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName() // 獨立部署模式提交類
  private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS =
    "org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"  // K8s集群提交的類

  override def main(args: Array[String]): Unit = {
    // 構建SparkSubmit實例
    val submit = new SparkSubmit() {
      self =>

      // 重寫SparkSubmit的解析參數方法
      override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
        // 構建SparkSubmitArguments對象
        new SparkSubmitArguments(args) {
      // 重寫logInfo和logWarning,調用該類中如下定義的2個方法
          override protected def logInfo(msg: => String): Unit = self.logInfo(msg)

          override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
        }
      }

      override protected def logInfo(msg: => String): Unit = printMessage(msg)

      override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")

      override def doSubmit(args: Array[String]): Unit = {
        try {
          super.doSubmit(args)
        } catch {
          case e: SparkUserAppException =>
            exitFn(e.exitCode)
        }
      }

    }

    // 執行提交代碼
    submit.doSubmit(args)
  }

  /**
   * Return whether the given primary resource represents a user jar.
   */
  private[deploy] def isUserJar(res: String): Boolean = {
    !isShell(res) && !isPython(res) && !isInternal(res) && !isR(res)
  }

  /**
   * Return whether the given primary resource represents a shell.
   */
  private[deploy] def isShell(res: String): Boolean = {
    (res == SPARK_SHELL || res == PYSPARK_SHELL || res == SPARKR_SHELL)
  }

  /**
   * Return whether the given main class represents a sql shell.
   */
  private[deploy] def isSqlShell(mainClass: String): Boolean = {
    mainClass == "org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver"
  }

  /**
   * Return whether the given main class represents a thrift server.
   */
  private def isThriftServer(mainClass: String): Boolean = {
    mainClass == "org.apache.spark.sql.hive.thriftserver.HiveThriftServer2"
  }

  /**
   * Return whether the given primary resource requires running python.
   */
  private[deploy] def isPython(res: String): Boolean = {
    res != null && res.endsWith(".py") || res == PYSPARK_SHELL
  }

  /**
   * Return whether the given primary resource requires running R.
   */
  private[deploy] def isR(res: String): Boolean = {
    res != null && res.endsWith(".R") || res == SPARKR_SHELL
  }

  private[deploy] def isInternal(res: String): Boolean = {
    res == SparkLauncher.NO_RESOURCE
  }

}
View Code

在SparkSubmit類中doSubmit函數實現十分簡單:

  def doSubmit(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)

    // 解析參數信息
    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) // 應用提交
      case SparkSubmitAction.KILL => kill(appArgs) // 應用刪除(只適用於standalone和memos集群)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) // 查詢應用狀態(只適用於standalone和memos集群)
      case SparkSubmitAction.PRINT_VERSION => printVersion()  // 打印應用版本信息
    }
  }
View Code

不難明白這是一個主控函數,根據接受的action類型,調用對應的處理:

a. case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)---提交spark任務

b.case SparkSubmitAction.KILL => kill(appArgs)---殺掉spark任務

c. case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)---獲取任務狀態

d. case SparkSubmitAction.PRINT_VERSION => printVersion()---打印版本信息

我們想明白spark任務提交的具體實現類,需要進入submit函數查看具體的業務:

   /**
   * 使用提供的參數信息來提交application
   * Submit the application using the provided parameters.
   *
   * 運行包含兩步:
   * 第一步,我們通過設置適當的類路徑,系統屬性和應用程序參數來准備啟動環境,以便基於集群管理和部署模式運行子主類。
   * 第二步,我們使用這個啟動環境來調用子主類的主方法。
   * This runs in two steps. First, we prepare the launch environment by setting up
   * the appropriate classpath, system properties, and application arguments for
   * running the child main class based on the cluster manager and the deploy mode.
   * Second, we use this launch environment to invoke the main method of the child
   * main class.
   */
  @tailrec
  private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    // 通過設置適當的類路徑,系統屬性和應用程序參數來准備啟動環境,以便基於集群管理和部署模式運行子主類。
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)

    def doRunMain(): Unit = {
      if (args.proxyUser != null) {
        val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
          UserGroupInformation.getCurrentUser())
        try {
          proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
            override def run(): Unit = {
              runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
            }
          })
        } catch {
          case e: Exception =>
            // Hadoop's AuthorizationException suppresses the exception's stack trace, which
            // makes the message printed to the output by the JVM not very helpful. Instead,
            // detect exceptions with empty stack traces here, and treat them differently.
            if (e.getStackTrace().length == 0) {
              error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
            } else {
              throw e
            }
        }
      } else {
        runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
      }
    }

    // Let the main class re-initialize the logging system once it starts.
    if (uninitLog) {
      Logging.uninitialize()
    }

    //在獨立集群模式下,有兩個提交網關:
    //(1)使用o.a.s.deploy.Client作為包裝器的傳統RPC網關
    //(2)Spark 1.3中引入了新的基於REST的網關
    //后者是Spark 1.3的默認行為,但如果主端點不是REST服務器,則Spark Submit將故障轉移到使用舊網關。
    // In standalone cluster mode, there are two submission gateways:
    //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
    //   (2) The new REST-based gateway introduced in Spark 1.3
    // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
    // to use the legacy gateway if the master endpoint turns out to be not a REST server.
    if (args.isStandaloneCluster && args.useRest) {
      try {
        logInfo("Running Spark using the REST application submission protocol.")
        doRunMain()
      } catch {
        // Fail over to use the legacy submission gateway
        case e: SubmitRestConnectionException =>
          logWarning(s"Master endpoint ${args.master} was not a REST server. " +
            "Falling back to legacy submission gateway instead.")
          args.useRest = false
          submit(args, false)
      }
    // In all other modes, just run the main class as prepared
    // 其他模式,只需直接運行主類
    } else {
      doRunMain()
    }
  }
View Code

其中:

prepareSubmitEnvironment方法通過設置適當的類路徑,系統屬性和應用程序參數來准備啟動環境,以便基於集群管理和部署模式運行子主類

submit(…)函數最后一行會調用該函數內部自定義函數doRunMain(),該函數會根據應用程序參數(args.proxyUser)做一次判斷處理:

a. 如果是代理用戶,則使用proxyUser 對runMain()函數包裝調用;

b.  如果非代理用戶,則直接調用runMain()函數。

(2) 任務運行環境准備

通過設置適當的類路徑,系統屬性和應用程序參數來准備啟動環境,以便基於集群管理和部署模式運行子主類。

val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
/**
   * 未提交的應用程序准備環境
   * Prepare the environment for submitting an application.
   *
   * @param args the parsed SparkSubmitArguments used for environment preparation.
   * @param conf the Hadoop Configuration, this argument will only be set in unit test.
   * 返回一個4元組(childArgs, childClasspath, sparkConf, childMainClass)
   *        childArgs:子進程的參數
   *        childClasspath:子級的類路徑條目列表
   *        sparkConf:系統參數map集合
   *        childMainClass:子級的主類
   * @return a 4-tuple:
   *        (1) the arguments for the child process,
   *        (2) a list of classpath entries for the child,
   *        (3) a map of system properties, and
   *        (4) the main class for the child
   *
   * Exposed for testing.
   */
  private[deploy] def prepareSubmitEnvironment(
      args: SparkSubmitArguments,
      conf: Option[HadoopConfiguration] = None)
      : (Seq[String], Seq[String], SparkConf, String) = {
    // Return values
    val childArgs = new ArrayBuffer[String]()
    val childClasspath = new ArrayBuffer[String]()
    val sparkConf = new SparkConf()
    var childMainClass = ""

    // 設置集群管理器,
    // 從這個列表中可以得到信息:spark目前支持的集群管理器包含:YARN,STANDLONE,MESOS,KUBERNETES,LOCAL,
    // 在spark-submit參數的--master中指定。
    // Set the cluster manager
    val clusterManager: Int = args.master match {
      case "yarn" => YARN
      case "yarn-client" | "yarn-cluster" =>
        // spark2.0之前可以使用yarn-cleint,yarn-cluster作為--master參數,從spark2.0起,不再支持,這里默認自動轉化為yarn,並給出警告信息。
        logWarning(s"Master ${args.master} is deprecated since 2.0." +
          " Please use master \"yarn\" with specified deploy mode instead.")
        YARN
      case m if m.startsWith("spark") => STANDALONE
      case m if m.startsWith("mesos") => MESOS
      case m if m.startsWith("k8s") => KUBERNETES
      case m if m.startsWith("local") => LOCAL
      case _ =>
        error("Master must either be yarn or start with spark, mesos, k8s, or local")
        -1
    }

    // 設置部署模式--deploy-mode,默認為client模式。
    // Set the deploy mode; default is client mode
    var deployMode: Int = args.deployMode match {
      case "client" | null => CLIENT
      case "cluster" => CLUSTER
      case _ =>
        error("Deploy mode must be either client or cluster")
        -1
    }

    // 由於”yarn-cluster“和”yarn-client“方式已被棄用,因此封裝了--master和--deploy-mode。
    // 如果只指定了一個--master和--deploy-mode,我們有一些邏輯來推斷它們之間的關系;如果它們不一致,我們可以提前退出。
    // Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
    // the master and deploy mode, we have some logic to infer the master and deploy mode
    // from each other if only one is specified, or exit early if they are at odds.
    if (clusterManager == YARN) {
      (args.master, args.deployMode) match {
        case ("yarn-cluster", null) =>
          deployMode = CLUSTER
          args.master = "yarn"
        case ("yarn-cluster", "client") =>
          error("Client deploy mode is not compatible with master \"yarn-cluster\"")
        case ("yarn-client", "cluster") =>
          error("Cluster deploy mode is not compatible with master \"yarn-client\"")
        case (_, mode) =>
          args.master = "yarn"
      }

      // 如果我們想去使用YARN的話,必須確保它包含在我們構建中。
      // Make sure YARN is included in our build if we're trying to use it
      if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
        error(
          "Could not load YARN classes. " +
          "This copy of Spark may not have been compiled with YARN support.")
      }
    }

    if (clusterManager == KUBERNETES) {
      args.master = Utils.checkAndGetK8sMasterUrl(args.master)
      // Make sure KUBERNETES is included in our build if we're trying to use it
      if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) {
        error(
          "Could not load KUBERNETES classes. " +
            "This copy of Spark may not have been compiled with KUBERNETES support.")
      }
    }

    // 下邊的一些模式是不支持,盡早讓它們失敗。
    // Fail fast, the following modes are not supported or applicable
    (clusterManager, deployMode) match {
      case (STANDALONE, CLUSTER) if args.isPython =>
        error("Cluster deploy mode is currently not supported for python " +
          "applications on standalone clusters.")
      case (STANDALONE, CLUSTER) if args.isR =>
        error("Cluster deploy mode is currently not supported for R " +
          "applications on standalone clusters.")
      case (LOCAL, CLUSTER) =>
        error("Cluster deploy mode is not compatible with master \"local\"")
      case (_, CLUSTER) if isShell(args.primaryResource) =>
        error("Cluster deploy mode is not applicable to Spark shells.")
      case (_, CLUSTER) if isSqlShell(args.mainClass) =>
        error("Cluster deploy mode is not applicable to Spark SQL shell.")
      case (_, CLUSTER) if isThriftServer(args.mainClass) =>
        error("Cluster deploy mode is not applicable to Spark Thrift server.")
      case _ =>
    }

    // 如果args.deployMode為null的話,給它賦值更新。稍后它將作為Spark的屬性向下傳遞
    // Update args.deployMode if it is null. It will be passed down as a Spark property later.
    (args.deployMode, deployMode) match {
      case (null, CLIENT) => args.deployMode = "client"
      case (null, CLUSTER) => args.deployMode = "cluster"
      case _ =>
    }

    // 根據資源管理器和部署模式,進行邏輯判斷出幾種特殊運行方式。
    val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER
    val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER
    val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER
    val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER
    val isMesosClient = clusterManager == MESOS && deployMode == CLIENT

    if (!isMesosCluster && !isStandAloneCluster) {
      // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files
      // too for packages that include Python code
      val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(
        args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath,
        args.ivySettingsPath)

      if (!StringUtils.isBlank(resolvedMavenCoordinates)) {
        args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates)
        if (args.isPython || isInternal(args.primaryResource)) {
          args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates)
        }
      }

      // install any R packages that may have been passed through --jars or --packages.
      // Spark Packages may contain R source code inside the jar.
      if (args.isR && !StringUtils.isBlank(args.jars)) {
        RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose)
      }
    }

    args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
    val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
    val targetDir = Utils.createTempDir()

    // assure a keytab is available from any place in a JVM
    if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) {
      if (args.principal != null) {
        if (args.keytab != null) {
          require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist")
          // Add keytab and principal configurations in sysProps to make them available
          // for later use; e.g. in spark sql, the isolated class loader used to talk
          // to HiveMetastore will use these settings. They will be set as Java system
          // properties and then loaded by SparkConf
          sparkConf.set(KEYTAB, args.keytab)
          sparkConf.set(PRINCIPAL, args.principal)
          UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab)
        }
      }
    }

    // Resolve glob path for different resources.
    args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull
    args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull
    args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull
    args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull

    lazy val secMgr = new SecurityManager(sparkConf)

    // In client mode, download remote files.
    var localPrimaryResource: String = null
    var localJars: String = null
    var localPyFiles: String = null
    if (deployMode == CLIENT) {
      localPrimaryResource = Option(args.primaryResource).map {
        downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr)
      }.orNull
      localJars = Option(args.jars).map {
        downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
      }.orNull
      localPyFiles = Option(args.pyFiles).map {
        downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
      }.orNull
    }

    // When running in YARN, for some remote resources with scheme:
    //   1. Hadoop FileSystem doesn't support them.
    //   2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes".
    // We will download them to local disk prior to add to YARN's distributed cache.
    // For yarn client mode, since we already download them with above code, so we only need to
    // figure out the local path and replace the remote one.
    if (clusterManager == YARN) {
      val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES)

      def shouldDownload(scheme: String): Boolean = {
        forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) ||
          Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure
      }

      def downloadResource(resource: String): String = {
        val uri = Utils.resolveURI(resource)
        uri.getScheme match {
          case "local" | "file" => resource
          case e if shouldDownload(e) =>
            val file = new File(targetDir, new Path(uri).getName)
            if (file.exists()) {
              file.toURI.toString
            } else {
              downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr)
            }
          case _ => uri.toString
        }
      }

      args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull
      args.files = Option(args.files).map { files =>
        Utils.stringToSeq(files).map(downloadResource).mkString(",")
      }.orNull
      args.pyFiles = Option(args.pyFiles).map { pyFiles =>
        Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",")
      }.orNull
      args.jars = Option(args.jars).map { jars =>
        Utils.stringToSeq(jars).map(downloadResource).mkString(",")
      }.orNull
      args.archives = Option(args.archives).map { archives =>
        Utils.stringToSeq(archives).map(downloadResource).mkString(",")
      }.orNull
    }

    // If we're running a python app, set the main class to our specific python runner
    if (args.isPython && deployMode == CLIENT) {
      if (args.primaryResource == PYSPARK_SHELL) {
        args.mainClass = "org.apache.spark.api.python.PythonGatewayServer"
      } else {
        // If a python file is provided, add it to the child arguments and list of files to deploy.
        // Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
        args.mainClass = "org.apache.spark.deploy.PythonRunner"
        args.childArgs = ArrayBuffer(localPrimaryResource, localPyFiles) ++ args.childArgs
      }
      if (clusterManager != YARN) {
        // The YARN backend handles python files differently, so don't merge the lists.
        args.files = mergeFileLists(args.files, args.pyFiles)
      }
    }

    if (localPyFiles != null) {
      sparkConf.set("spark.submit.pyFiles", localPyFiles)
    }

    // In YARN mode for an R app, add the SparkR package archive and the R package
    // archive containing all of the built R libraries to archives so that they can
    // be distributed with the job
    if (args.isR && clusterManager == YARN) {
      val sparkRPackagePath = RUtils.localSparkRPackagePath
      if (sparkRPackagePath.isEmpty) {
        error("SPARK_HOME does not exist for R application in YARN mode.")
      }
      val sparkRPackageFile = new File(sparkRPackagePath.get, SPARKR_PACKAGE_ARCHIVE)
      if (!sparkRPackageFile.exists()) {
        error(s"$SPARKR_PACKAGE_ARCHIVE does not exist for R application in YARN mode.")
      }
      val sparkRPackageURI = Utils.resolveURI(sparkRPackageFile.getAbsolutePath).toString

      // Distribute the SparkR package.
      // Assigns a symbol link name "sparkr" to the shipped package.
      args.archives = mergeFileLists(args.archives, sparkRPackageURI + "#sparkr")

      // Distribute the R package archive containing all the built R packages.
      if (!RUtils.rPackages.isEmpty) {
        val rPackageFile =
          RPackageUtils.zipRLibraries(new File(RUtils.rPackages.get), R_PACKAGE_ARCHIVE)
        if (!rPackageFile.exists()) {
          error("Failed to zip all the built R packages.")
        }

        val rPackageURI = Utils.resolveURI(rPackageFile.getAbsolutePath).toString
        // Assigns a symbol link name "rpkg" to the shipped package.
        args.archives = mergeFileLists(args.archives, rPackageURI + "#rpkg")
      }
    }

    // TODO: Support distributing R packages with standalone cluster
    if (args.isR && clusterManager == STANDALONE && !RUtils.rPackages.isEmpty) {
      error("Distributing R packages with standalone cluster is not supported.")
    }

    // TODO: Support distributing R packages with mesos cluster
    if (args.isR && clusterManager == MESOS && !RUtils.rPackages.isEmpty) {
      error("Distributing R packages with mesos cluster is not supported.")
    }

    // If we're running an R app, set the main class to our specific R runner
    if (args.isR && deployMode == CLIENT) {
      if (args.primaryResource == SPARKR_SHELL) {
        args.mainClass = "org.apache.spark.api.r.RBackend"
      } else {
        // If an R file is provided, add it to the child arguments and list of files to deploy.
        // Usage: RRunner <main R file> [app arguments]
        args.mainClass = "org.apache.spark.deploy.RRunner"
        args.childArgs = ArrayBuffer(localPrimaryResource) ++ args.childArgs
        args.files = mergeFileLists(args.files, args.primaryResource)
      }
    }

    if (isYarnCluster && args.isR) {
      // In yarn-cluster mode for an R app, add primary resource to files
      // that can be distributed with the job
      args.files = mergeFileLists(args.files, args.primaryResource)
    }

    // Special flag to avoid deprecation warnings at the client
    sys.props("SPARK_SUBMIT") = "true"

    // A list of rules to map each argument to system properties or command-line options in
    // each deploy mode; we iterate through these below
    val options = List[OptionAssigner](

      // All cluster managers
      OptionAssigner(args.master, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.master"),
      OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = "spark.submit.deployMode"),
      OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"),
      OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"),
      OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT,
        confKey = "spark.driver.memory"),
      OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = "spark.driver.extraClassPath"),
      OptionAssigner(args.driverExtraJavaOptions, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = "spark.driver.extraJavaOptions"),
      OptionAssigner(args.driverExtraLibraryPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES,
        confKey = "spark.driver.extraLibraryPath"),

      // Propagate attributes for dependency resolution at the driver side
      OptionAssigner(args.packages, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.packages"),
      OptionAssigner(args.repositories, STANDALONE | MESOS, CLUSTER,
        confKey = "spark.jars.repositories"),
      OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS, CLUSTER, confKey = "spark.jars.ivy"),
      OptionAssigner(args.packagesExclusions, STANDALONE | MESOS,
        CLUSTER, confKey = "spark.jars.excludes"),

      // Yarn only
      OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
      OptionAssigner(args.numExecutors, YARN, ALL_DEPLOY_MODES,
        confKey = "spark.executor.instances"),
      OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles"),
      OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"),
      OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"),
      OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives"),
      OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.principal"),
      OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.keytab"),

      // Other options
      OptionAssigner(args.executorCores, STANDALONE | YARN | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = "spark.executor.cores"),
      OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = "spark.executor.memory"),
      OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = "spark.cores.max"),
      OptionAssigner(args.files, LOCAL | STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = "spark.files"),
      OptionAssigner(args.jars, LOCAL, CLIENT, confKey = "spark.jars"),
      OptionAssigner(args.jars, STANDALONE | MESOS | KUBERNETES, ALL_DEPLOY_MODES,
        confKey = "spark.jars"),
      OptionAssigner(args.driverMemory, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
        confKey = "spark.driver.memory"),
      OptionAssigner(args.driverCores, STANDALONE | MESOS | YARN | KUBERNETES, CLUSTER,
        confKey = "spark.driver.cores"),
      OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER,
        confKey = "spark.driver.supervise"),
      OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"),

      // An internal option used only for spark-shell to add user jars to repl's classloader,
      // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to
      // remote jars, so adding a new option to only specify local jars for spark-shell internally.
      OptionAssigner(localJars, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.repl.local.jars")
    )

    // In client mode, launch the application main class directly
    // In addition, add the main application jar and any added jars (if any) to the classpath
    if (deployMode == CLIENT) {
      childMainClass = args.mainClass
      if (localPrimaryResource != null && isUserJar(localPrimaryResource)) {
        childClasspath += localPrimaryResource
      }
      if (localJars != null) { childClasspath ++= localJars.split(",") }
    }
    // Add the main application jar and any added jars to classpath in case YARN client
    // requires these jars.
    // This assumes both primaryResource and user jars are local jars, or already downloaded
    // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be
    // added to the classpath of YARN client.
    if (isYarnCluster) {
      if (isUserJar(args.primaryResource)) {
        childClasspath += args.primaryResource
      }
      if (args.jars != null) { childClasspath ++= args.jars.split(",") }
    }

    if (deployMode == CLIENT) {
      if (args.childArgs != null) { childArgs ++= args.childArgs }
    }

    // Map all arguments to command-line options or system properties for our chosen mode
    for (opt <- options) {
      if (opt.value != null &&
          (deployMode & opt.deployMode) != 0 &&
          (clusterManager & opt.clusterManager) != 0) {
        if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
        if (opt.confKey != null) { sparkConf.set(opt.confKey, opt.value) }
      }
    }

    // In case of shells, spark.ui.showConsoleProgress can be true by default or by user.
    if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) {
      sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true)
    }

    // Add the application jar automatically so the user doesn't have to call sc.addJar
    // For YARN cluster mode, the jar is already distributed on each node as "app.jar"
    // For python and R files, the primary resource is already distributed as a regular file
    if (!isYarnCluster && !args.isPython && !args.isR) {
      var jars = sparkConf.getOption("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq.empty)
      if (isUserJar(args.primaryResource)) {
        jars = jars ++ Seq(args.primaryResource)
      }
      sparkConf.set("spark.jars", jars.mkString(","))
    }

    // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
    // All Spark parameters are expected to be passed to the client through system properties.
    if (args.isStandaloneCluster) {
      if (args.useRest) {
        childMainClass = REST_CLUSTER_SUBMIT_CLASS
        childArgs += (args.primaryResource, args.mainClass)
      } else {
        // In legacy standalone cluster mode, use Client as a wrapper around the user class
        childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
        if (args.supervise) { childArgs += "--supervise" }
        Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
        Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
        childArgs += "launch"
        childArgs += (args.master, args.primaryResource, args.mainClass)
      }
      if (args.childArgs != null) {
        childArgs ++= args.childArgs
      }
    }

    // Let YARN know it's a pyspark app, so it distributes needed libraries.
    if (clusterManager == YARN) {
      if (args.isPython) {
        sparkConf.set("spark.yarn.isPython", "true")
      }
    }

    if (clusterManager == MESOS && UserGroupInformation.isSecurityEnabled) {
      setRMPrincipal(sparkConf)
    }

    // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    if (isYarnCluster) {
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS
      if (args.isPython) {
        childArgs += ("--primary-py-file", args.primaryResource)
        childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
      } else if (args.isR) {
        val mainFile = new Path(args.primaryResource).getName
        childArgs += ("--primary-r-file", mainFile)
        childArgs += ("--class", "org.apache.spark.deploy.RRunner")
      } else {
        if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
          childArgs += ("--jar", args.primaryResource)
        }
        childArgs += ("--class", args.mainClass)
      }
      if (args.childArgs != null) {
        args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
      }
    }

    if (isMesosCluster) {
      assert(args.useRest, "Mesos cluster mode is only supported through the REST submission API")
      childMainClass = REST_CLUSTER_SUBMIT_CLASS
      if (args.isPython) {
        // Second argument is main class
        childArgs += (args.primaryResource, "")
        if (args.pyFiles != null) {
          sparkConf.set("spark.submit.pyFiles", args.pyFiles)
        }
      } else if (args.isR) {
        // Second argument is main class
        childArgs += (args.primaryResource, "")
      } else {
        childArgs += (args.primaryResource, args.mainClass)
      }
      if (args.childArgs != null) {
        childArgs ++= args.childArgs
      }
    }

    if (isKubernetesCluster) {
      childMainClass = KUBERNETES_CLUSTER_SUBMIT_CLASS
      if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
        if (args.isPython) {
          childArgs ++= Array("--primary-py-file", args.primaryResource)
          childArgs ++= Array("--main-class", "org.apache.spark.deploy.PythonRunner")
          if (args.pyFiles != null) {
            childArgs ++= Array("--other-py-files", args.pyFiles)
          }
        } else if (args.isR) {
          childArgs ++= Array("--primary-r-file", args.primaryResource)
          childArgs ++= Array("--main-class", "org.apache.spark.deploy.RRunner")
        }
        else {
          childArgs ++= Array("--primary-java-resource", args.primaryResource)
          childArgs ++= Array("--main-class", args.mainClass)
        }
      } else {
        childArgs ++= Array("--main-class", args.mainClass)
      }
      if (args.childArgs != null) {
        args.childArgs.foreach { arg =>
          childArgs += ("--arg", arg)
        }
      }
    }

    // Load any properties specified through --conf and the default properties file
    for ((k, v) <- args.sparkProperties) {
      sparkConf.setIfMissing(k, v)
    }

    // Ignore invalid spark.driver.host in cluster modes.
    if (deployMode == CLUSTER) {
      sparkConf.remove("spark.driver.host")
    }

    // Resolve paths in certain spark properties
    val pathConfigs = Seq(
      "spark.jars",
      "spark.files",
      "spark.yarn.dist.files",
      "spark.yarn.dist.archives",
      "spark.yarn.dist.jars")
    pathConfigs.foreach { config =>
      // Replace old URIs with resolved URIs, if they exist
      sparkConf.getOption(config).foreach { oldValue =>
        sparkConf.set(config, Utils.resolveURIs(oldValue))
      }
    }

    // Resolve and format python file paths properly before adding them to the PYTHONPATH.
    // The resolving part is redundant in the case of --py-files, but necessary if the user
    // explicitly sets `spark.submit.pyFiles` in his/her default properties file.
    sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles =>
      val resolvedPyFiles = Utils.resolveURIs(pyFiles)
      val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) {
        PythonRunner.formatPaths(resolvedPyFiles).mkString(",")
      } else {
        // Ignoring formatting python path in yarn and mesos cluster mode, these two modes
        // support dealing with remote python files, they could distribute and add python files
        // locally.
        resolvedPyFiles
      }
      sparkConf.set("spark.submit.pyFiles", formattedPyFiles)
    }

    (childArgs, childClasspath, sparkConf, childMainClass)
  }
View Code

其中:

a. 當部署模式為client,則子進程的主類為用戶通過spark-submit提交的類,即代碼中的:childMainClass = args.mainClass

b. 當master為Yarn且部署模式為cluster時,子進程的主類為:org.apache.spark.deploy.yarn.YarnClusterApplication

1) 准備Yarn(Cluster Manager)的執行類:

使用spark-submit啟動時,實際上執行的是exec "SPARKHOME"/bin/sparkclass  org.apache.spark.deploy.SparkSubmit   "@"

在SparkSubmit中,prepareSubmitEnvironment方法中會為spark提交做准備,准備好運行環境相關。

private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments,conf: Option[HadoopConfiguration] = None): (Seq[String], Seq[String], SparkConf, String)

其中該方法內部代碼中,發現當cluster manager為yarn時:

a. 當--deploy-mode:cluster時

會調用YarnClusterApplication進行提交。YarnClusterApplication是org.apache.spark.deploy.yarn.Client中的一個內部類,在YarnClusterApplication中new了一個Client對象,並調用了run方法

override def start(args: Array[String], conf: SparkConf): Unit = {
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    conf.remove("spark.jars")
    conf.remove("spark.files")

    new Client(new ClientArguments(args), conf).run()
  }
View Code

b. 當--deploy-mode:client時

調用application-jar.jar自身main函數,執行的是JavaMainApplication

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.deploy

import java.lang.reflect.Modifier

import org.apache.spark.SparkConf

/**
 * Entry point for a Spark application. Implementations must provide a no-argument constructor.
 */
private[spark] trait SparkApplication {

  def start(args: Array[String], conf: SparkConf): Unit

}

/**
 * Implementation of SparkApplication that wraps a standard Java class with a "main" method.
 *
 * Configuration is propagated to the application via system properties, so running multiple
 * of these in the same JVM may lead to undefined behavior due to configuration leaks.
 */
private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {

  override def start(args: Array[String], conf: SparkConf): Unit = {
    val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
    if (!Modifier.isStatic(mainMethod.getModifiers)) {
      throw new IllegalStateException("The main method in the given main class must be static")
    }

    val sysProps = conf.getAll.toMap
    sysProps.foreach { case (k, v) =>
      sys.props(k) = v
    }

    mainMethod.invoke(null, args)
  }

}
View Code

從JavaMainApplication實現可以發現,JavaMainApplication中調用start方法時,只是通過反射執行application-jar.jar的main函數

(3) 提交到Yarn

1) yarn-cluster運行流程:

當yarn-custer模式中,YarnClusterApplication類中運行的是Client中run方法,Client#run()中實現了任務提交流程:

/**
   * Submit an application to the ResourceManager.
   * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive
   * reporting the application's status until the application has exited for any reason.
   * Otherwise, the client process will exit after submission.
   * If the application finishes with a failed, killed, or undefined status,
   * throw an appropriate SparkException.
   */
  def run(): Unit = {
    this.appId = submitApplication()
    if (!launcherBackend.isConnected() && fireAndForget) {
      val report = getApplicationReport(appId)
      val state = report.getYarnApplicationState
      logInfo(s"Application report for $appId (state: $state)")
      logInfo(formatReportDetails(report))
      if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) {
        throw new SparkException(s"Application $appId finished with status: $state")
      }
    } else {
      val YarnAppReport(appState, finalState, diags) = monitorApplication(appId)
      if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) {
        diags.foreach { err =>
          logError(s"Application diagnostics message: $err")
        }
        throw new SparkException(s"Application $appId finished with failed status")
      }
      if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) {
        throw new SparkException(s"Application $appId is killed")
      }
      if (finalState == FinalApplicationStatus.UNDEFINED) {
        throw new SparkException(s"The final status of application $appId is undefined")
      }
    }
  }
View Code

在Client類的run()方法中會調用submitApplication()方法,該方法實現:

/**
   * Submit an application running our ApplicationMaster to the ResourceManager.
   *
   * The stable Yarn API provides a convenience method (YarnClient#createApplication) for
   * creating applications and setting up the application submission context. This was not
   * available in the alpha API.
   */
  def submitApplication(): ApplicationId = {
    var appId: ApplicationId = null
    try {
      launcherBackend.connect()
      yarnClient.init(hadoopConf)
      yarnClient.start()

      logInfo("Requesting a new application from cluster with %d NodeManagers"
        .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers))

      // Get a new application from our RM
      val newApp = yarnClient.createApplication()
      val newAppResponse = newApp.getNewApplicationResponse()
      appId = newAppResponse.getApplicationId()

      new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT),
        Option(appId.toString)).setCurrentContext()

      // Verify whether the cluster has enough resources for our AM
      verifyClusterResources(newAppResponse)

      // Set up the appropriate contexts to launch our AM
      val containerContext = createContainerLaunchContext(newAppResponse)
      val appContext = createApplicationSubmissionContext(newApp, containerContext)

      // Finally, submit and monitor the application
      logInfo(s"Submitting application $appId to ResourceManager")
      yarnClient.submitApplication(appContext)
      launcherBackend.setAppId(appId.toString)
      reportLauncherState(SparkAppHandle.State.SUBMITTED)

      appId
    } catch {
      case e: Throwable =>
        if (appId != null) {
          cleanupStagingDir(appId)
        }
        throw e
    }
  }
View Code

 

run()方法則是實現向yarn中的ResourceManager(后文全部簡稱RM)提交運行任務,並運行我們的ApplicationMaster(后文簡稱AM)。

穩定的Yarn API提供了一種方便的方法(YarnClient#createApplication),用於創建應用程序和設置應用程序提交上下文。

submitApplication()方法具體操作步驟:

l  初始化並啟動YarnClient,后邊將使用yarnClient提供的各種API

l  通過調用yarnClient#createApplication()方法,從RM獲取一個newApp(application),該newApp用於運行AM。通過newApp#getNewApplicationResponse()返回newApp需要資源情況(newAppResponse)。

l  通過newAppResponse驗證集群是否有足夠的資源來運行AM。

l  設置適當的上下文來以啟動AM。

l  調用yarnClient#submitApplication(appContext)向yarn提交任務啟動的請求,並監控application。

yarn-client運行流程:

  • 對於部署方式是Client的情況,SparkSubmit的main函數中通過反射執行應用程序的main方法
  • 在應用程序的main方法中,創建SparkContext實例
  • 在創建SparkContext的實例過程中,通過如下語句創建Scheduler和Backend實例
復制代碼
  private var _schedulerBackend: SchedulerBackend = _
  private var _taskScheduler: TaskScheduler = _
  
  private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend

  private[spark] def taskScheduler: TaskScheduler = _taskScheduler
  private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
    _taskScheduler = ts
  }
  
  // 構造函數中初始化賦值
    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
_taskScheduler = ts
復制代碼

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala

SparkContext初始化過程

在Yarn模式下,SparkContext初始化位置因--deploy-mode不同而不同:

yarn-cluster模式下:client會先申請向RM(Yarn Resource Manager)一個Container,來啟動AM(ApplicationMaster)進程,而SparkContext運行在AM(ApplicationMaster)進程中;

yarn-client模式下  :在提交節點上執行SparkContext初始化,由client類(JavaMainApplication)調用。

復制代碼
/**
   * Create a task scheduler based on a given master URL.
   * Return a 2-tuple of the scheduler backend and the task scheduler.
   */
  private def createTaskScheduler(。。。): (SchedulerBackend, TaskScheduler) = {
    。。。
    master match {
      case "local" =>
        。。。
      case LOCAL_N_REGEX(threads) =>
        。。。
      case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
        。。。。
      case SPARK_REGEX(sparkUrl) =>
        。。。。
      case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
        。。。。
      case masterUrl =>
        val cm = getClusterManager(masterUrl) match {
          case Some(clusterMgr) => clusterMgr
          case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
        }
        try {
          val scheduler = cm.createTaskScheduler(sc, masterUrl)
          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
          cm.initialize(scheduler, backend)
          (backend, scheduler)
        } catch {
          case se: SparkException => throw se
          case NonFatal(e) =>
            throw new SparkException("External scheduler cannot be instantiated", e)
        }
    }
  }

  private def getClusterManager(url: String): Option[ExternalClusterManager] = {
    val loader = Utils.getContextOrSparkClassLoader
    val serviceLoaders =
      ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url))
    if (serviceLoaders.size > 1) {
      throw new SparkException(
        s"Multiple external cluster managers registered for the url $url: $serviceLoaders")
    }
    serviceLoaders.headOption
  }  
復制代碼

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/SparkContext.scala

1)SparkContext#createTaskScheduler(。。。)

根據不同的資源管理方式cluster manager來創建不同的TaskScheduler,SchedulerBackend。

  1.1)SchedulerBackend與cluster manager資源管理器交互取得應用被分配的資源。

  1.2)TaskSheduler在不同的job之間調度,同時接收被分配的資源,之后由他來給每一個Task分配資源。

2)SparkContext#createTaskScheduler(。。。)

最后一個match case是對其他資源管理方式(除了local和standelone{spark://}外的mesos,yarn,kubernetes【外部資源管理器】的資源管理方式)的處理。

SparkContext#createTaskScheduler(。。。)#master match#case masterUrl下邊調用了getClusterManager(masterUrl)方法,該方法返回對象是實現了ExternalClusterManager接口的YarnClusterManager類對象。

備注:實現了ExternalClusterManager接口的類還包含:

MesosClusterManager (https://github.com/apache/spark/blob/branch-2.4/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala

KubernetesClusterManager (https://github.com/apache/spark/blob/branch-2.4/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala

ExternalClusterManager接口定義:

復制代碼
private[spark] trait ExternalClusterManager {
  def canCreate(masterURL: String): Boolean

  def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
  
  def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend
      
  def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
}
復制代碼

https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala

ExternalClusterManager接口提供了4個方法:

-canCreate(masterURL: String):Boolean  檢查此群集管理器實例是否可以為某個masterURL創建scheduler組件。

-createTaskScheduler(sc: SparkContext, masterURL: String):TaskScheduler  為給定的SparkContext創建TaskScheduler實例

-createSchedulerBackend(sc: SparkContext,masterURL: String,scheduler: TaskScheduler): SchedulerBackend  為給定的SparkContext和調度程序創建SchedulerBackend 。這是在使用“ExternalClusterManager.createTaskScheduler()”創建TaskScheduler后調用的。

-initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit  初始化TaskScheduler和SchedulerBackend,在創建調度程序組件之后調用。

YarnClusterManager類定義:

復制代碼
private[spark] class YarnClusterManager extends ExternalClusterManager {

  override def canCreate(masterURL: String): Boolean = {
    masterURL == "yarn"
  }

  override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
    sc.deployMode match {
      case "cluster" => new YarnClusterScheduler(sc)
      case "client" => new YarnScheduler(sc)
      case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
    }
  }

  override def createSchedulerBackend(sc: SparkContext,
      masterURL: String,
      scheduler: TaskScheduler): SchedulerBackend = {
    sc.deployMode match {
      case "cluster" =>
        new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
      case "client" =>
        new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
      case  _ =>
        throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
    }
  }

  override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = {
    scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend)
  }
}
復制代碼

https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterManager.scala

YarnClusterManager#createTaskScheduler(...)

在該方法中會根據SparkContext對象的deployMode屬性來進行分支判斷:

client時,返回YarnScheduler(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala)實例對象;

cluster時,返回YarnClusterScheduler(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala)實例對象。

YarnClusterManager#createSchedulerBackend(...)

在該方法中會根據SparkContext對象的deployMode屬性來進行分支判斷:

client時,返回YarnClientSchedulerBackend(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala)實例對象;

cluster時,返回YarnClusterSchedulerBackend(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala)實例對象。

Yarn作業運行運行架構原理解析:

1、分析Spark on YARN的Cluster模式,從用戶提交作業到作業運行結束整個運行期間的過程分析。

客戶端進行操作

  •   1、根據yarnConf來初始化yarnClient,並啟動yarnClient
  •   2、創建客戶端Application,並獲取Application的ID,進一步判斷集群中的資源是否滿足executor和ApplicationMaster申請的資源,如果不滿足則拋出IllegalArgumentException;
  •   3、設置資源、環境變量:其中包括了設置Application的Staging目錄、准備本地資源(jar文件、log4j.properties)、設置Application其中的環境變量、創建Container啟動的Context等;
  •   4、設置Application提交的Context,包括設置應用的名字、隊列、AM的申請的Container、標記該作業的類型為Spark;
  •   5、申請Memory,並最終通過yarnClient.submitApplication向ResourceManager提交該Application。

  當作業提交到YARN上之后,客戶端就沒事了,甚至在終端關掉那個進程也沒事,因為整個作業運行在YARN集群上進行,運行的結果將會保存到HDFS或者日志中。

提交到YARN集群,YARN操作

  •   1、運行ApplicationMaster的run方法;
  •   2、設置好相關的環境變量。
  •   3、創建amClient,並啟動;
  •   4、在Spark UI啟動之前設置Spark UI的AmIpFilter;
  •   5、在startUserClass函數專門啟動了一個線程(名稱為Driver的線程)來啟動用戶提交的Application,也就是啟動了Driver。在Driver中將會初始化SparkContext;
  •   6、等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次數(默認為10),如果等待了的次數超過了配置的,程序將會退出;否則用SparkContext初始化yarnAllocator;

  怎么知道SparkContext初始化完成?
  其實在5步驟中啟動Application的過程中會初始化SparkContext,在初始化SparkContext的時候將會創建YarnClusterScheduler,在SparkContext初始化完成的時候,會調用YarnClusterScheduler類中的postStartHook方法,而該方法會通知ApplicationMaster已經初始化好了SparkContext

  •   7、當SparkContext、Driver初始化完成的時候,通過amClient向ResourceManager注冊ApplicationMaster
  •   8、分配並啟動Executeors。在啟動Executeors之前,先要通過yarnAllocator獲取到numExecutors個Container,然后在Container中啟動Executeors。如果在啟動Executeors的過程中失敗的次數達到了maxNumExecutorFailures的次數,maxNumExecutorFailures的計算規則如下:
// Default to numExecutors * 2, with minimum of 3
private val maxNumExecutorFailures =sparkConf.getInt("spark.yarn.max.executor.failures",
sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors *2,3)))

  那么這個Application將失敗,將Application Status標明為FAILED,並將關閉SparkContext。其實,啟動Executeors是通過ExecutorRunnable實現的,而ExecutorRunnable內部是啟動CoarseGrainedExecutorBackend的。

  •   9、最后,Task將在CoarseGrainedExecutorBackend里面運行,然后運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業運行完成。

2、Spark on YARN client 模式作業運行全過程分析

我們知道Spark on yarn有兩種模式:yarn-cluster和yarn-client。這兩種模式作業雖然都是在yarn上面運行,但是其中的運行方式很不一樣,今天我就來談談Spark on YARN yarn-client模式作業從提交到運行的過程剖析。
  和yarn-cluster模式一樣,整個程序也是通過spark-submit腳本提交的。但是yarn-client作業程序的運行不需要通過Client類來封裝啟動,而是直接通過反射機制調用作業的main函數。下面就來分析:

  •   1、通過SparkSubmit類的launch的函數直接調用作業的main函數(通過反射機制實現),如果是集群模式就會調用Client的main函數。
  •   2、而應用程序的main函數一定都有個SparkContent,並對其進行初始化;
  •   3、在SparkContent初始化中將會依次做如下的事情:設置相關的配置、注冊MapOutputTracker、BlockManagerMaster、BlockManager,創建taskScheduler和dagScheduler;其中比較重要的是創建taskScheduler和dagScheduler。在創建taskScheduler的時候會根據我們傳進來的master來選擇Scheduler和SchedulerBackend。由於我們選擇的是yarn-client模式,程序會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend,並將YarnClientSchedulerBackend的實例初始化YarnClientClusterScheduler,上面兩個實例的獲取都是通過反射機制實現的,YarnClientSchedulerBackend類是CoarseGrainedSchedulerBackend類的子類,YarnClientClusterScheduler是TaskSchedulerImpl的子類,僅僅重寫了TaskSchedulerImpl中的getRackForHost方法。
  •   4、初始化完taskScheduler后,將創建dagScheduler,然后通過taskScheduler.start()啟動taskScheduler,而在taskScheduler啟動的過程中也會調用SchedulerBackend的start方法。在SchedulerBackend啟動的過程中將會初始化一些參數,封裝在ClientArguments中,並將封裝好的ClientArguments傳進Client類中,並client.runApp()方法獲取Application ID。
  •   5、client.runApp里面的做是和前面客戶端進行操作那節類似,不同的是在里面啟動是ExecutorLauncher(yarn-cluster模式啟動的是ApplicationMaster)。
  •   6、在ExecutorLauncher里面會初始化並啟動amClient,然后向ApplicationMaster注冊該Application。注冊完之后將會等待driver的啟動,當driver啟動完之后,會創建一個MonitorActor對象用於和CoarseGrainedSchedulerBackend進行通信(只有事件AddWebUIFilter他們之間才通信,Task的運行狀況不是通過它和CoarseGrainedSchedulerBackend通信的)。然后就是設置addAmIpFilter,當作業完成的時候,ExecutorLauncher將通過amClient設置Application的狀態為FinalApplicationStatus.SUCCEEDED。
  •   7、分配Executors,這里面的分配邏輯和yarn-cluster里面類似,就不再說了。
  •   8、最后,Task將在CoarseGrainedExecutorBackend里面運行,然后運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業運行完成。
  •   9、在作業運行的時候,YarnClientSchedulerBackend會每隔1秒通過client獲取到作業的運行狀況,並打印出相應的運行信息,當Application的狀態是FINISHED、FAILED和KILLED中的一種,那么程序將退出等待。
  •   10、最后有個線程會再次確認Application的狀態,當Application的狀態是FINISHED、FAILED和KILLED中的一種,程序就運行完成,並停止SparkContext。整個過程就結束了。

 

YARN-Cluster運行架構原理

在YARN-Cluster模式中,當用戶向YARN中提交一個應用程序后,YARN將分兩個階段運行該應用程序:

  • 1.第一個階段是把Spark的Driver作為一個ApplicationMaster在YARN集群中先啟動;
  • 2.第二個階段是由ApplicationMaster創建應用程序,然后為它向ResourceManager申請資源,並啟動Executor來運行Task,同時監控它的整個運行過程,直到運行完成

說明如下:

  • Spark Yarn Client向YARN中提交應用程序,包括ApplicationMaster程序、啟動ApplicationMaster的命令、需要在Executor中運行的程序等;
  • ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,其中ApplicationMaster進行SparkContext等的初始化;
  • ApplicationMaster向ResourceManager注冊,這樣用戶可以直接通過ResourceManage查看應用程序的運行狀態,然后它將采用輪詢的方式通過RPC協議為各個任務申請資源,並監控它們的運行狀態直到運行結束;
  • 一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向ApplicationMaster中的SparkContext注冊並申請Task。這一點和Standalone模式一樣,只不過SparkContext在Spark Application中初始化時,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進行任務的調度,其中YarnClusterScheduler只是對TaskSchedulerImpl的一個簡單包裝,增加了對Executor的等待邏輯等;
  • ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向ApplicationMaster匯報運行的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;
  • 應用程序運行完成后,ApplicationMaster向ResourceManager申請注銷並關閉自己;

跟蹤CoarseGrainedExecutorBackend啟動腳本:

  View Code

YARN-Client運行架構原理

說明如下:

  • Spark Yarn Client向YARN的ResourceManager申請啟動Application Master。同時在SparkContent初始化中將創建DAGScheduler和TASKScheduler等,由於我們選擇的是Yarn-Client模式,程序會選擇YarnClientClusterSchedulerYarnScheduler和YarnClientSchedulerBackend;
  • ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,與YARN-Cluster區別的是在該ApplicationMaster不運行SparkContext,只與SparkContext進行聯系進行資源的分派;
  • Client中的SparkContext初始化完畢后,與ApplicationMaster建立通訊,向ResourceManager注冊,根據任務信息向ResourceManager申請資源(Container);
  • 一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向Client中的SparkContext注冊並申請Task;
  • client中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向Driver匯報運行的狀態和進度,以讓Client隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;
  • 應用程序運行完成后,Client的SparkContext向ResourceManager申請注銷並關閉自己。

Client模式 vs Cluster模式

    • 理解YARN-Client和YARN-Cluster深層次的區別之前先清楚一個概念:Application Master。在YARN中,每個Application實例都有一個ApplicationMaster進程,它是Application啟動的第一個容器。它負責和ResourceManager打交道並請求資源,獲取資源之后告訴NodeManager為其啟動Container。從深層次的含義講YARN-Cluster和YARN-Client模式的區別其實就是ApplicationMaster進程的區別;
    • YARN-Cluster模式下,Driver運行在AM(Application Master)中,它負責向YARN申請資源,並監督作業的運行狀況。當用戶提交了作業之后,就可以關掉Client,作業會繼續在YARN上運行,因而YARN-Cluster模式不適合運行交互類型的作業;
    • YARN-Client模式下,Application Master僅僅向YARN請求Executor,Client會和請求的Container通信來調度他們工作,也就是說Client不能離開;


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM