引導:
該篇章主要講解執行spark-submit.sh提交到將任務提交給Yarn階段代碼分析。
spark-submit的入口函數
一般提交一個spark作業的方式采用spark-submit來提交
# Run on a Spark standalone cluster ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --executor-memory 20G \ --total-executor-cores 100 \ /path/to/examples.jar \ 1000
這個是提交到standalone集群的方式,其中spark-submit內容如下:
https://github.com/apache/spark/blob/branch-2.4/bin/spark-submit
或者從spark2.4安裝目錄下找到spark-submit
[cp011@CDH-103 bin]$ more opt/cloudera/parcels/SPARK2-2.4.0.cloudera1-1.cdh5.13.3.p0.1007356/lib/spark2/bin/spark-submit #!/usr/bin/env bash # # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright ownership. # The ASF licenses this file to You under the Apache License, Version 2.0 # (the "License"); you may not use this file except in compliance with # the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # if [ -z "${SPARK_HOME}" ]; then source "$(dirname "$0")"/find-spark-home fi # disable randomized hash for string in Python 3.3+ export PYTHONHASHSEED=0 exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
從spark-submit內容上來看,可以發現spark-submit提交任務時,實際上最終是調用了SparkSubmit類。
從SparkSubmit的半生類上可以看到入口main函數:
object SparkSubmit extends CommandLineUtils with Logging { // Cluster managers private val YARN = 1 private val STANDALONE = 2 private val MESOS = 4 private val LOCAL = 8 private val KUBERNETES = 16 private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL | KUBERNETES // Deploy modes private val CLIENT = 1 private val CLUSTER = 2 private val ALL_DEPLOY_MODES = CLIENT | CLUSTER // Special primary resource names that represent shells rather than application jars. private val SPARK_SHELL = "spark-shell" private val PYSPARK_SHELL = "pyspark-shell" private val SPARKR_SHELL = "sparkr-shell" private val SPARKR_PACKAGE_ARCHIVE = "sparkr.zip" private val R_PACKAGE_ARCHIVE = "rpkg.zip" private val CLASS_NOT_FOUND_EXIT_STATUS = 101 // Following constants are visible for testing. private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication" private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName() private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName() private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.k8s.submit.KubernetesClientApplication" override def main(args: Array[String]): Unit = { val submit = new SparkSubmit() { self => override protected def parseArguments(args: Array[String]): SparkSubmitArguments = { new SparkSubmitArguments(args) { override protected def logInfo(msg: => String): Unit = self.logInfo(msg) override protected def logWarning(msg: => String): Unit = self.logWarning(msg) } } override protected def logInfo(msg: => String): Unit = printMessage(msg) override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg") override def doSubmit(args: Array[String]): Unit = { try { super.doSubmit(args) } catch { case e: SparkUserAppException => exitFn(e.exitCode) } } } submit.doSubmit(args) } 。。。 }
在SparkSubmit類中doSubmit函數實現十分簡單:
def doSubmit(args: Array[String]): Unit = { // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to // be reset before the application starts. val uninitLog = initializeLogIfNecessary(true, silent = true) val appArgs = parseArguments(args) if (appArgs.verbose) { logInfo(appArgs.toString) } appArgs.action match { case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog) case SparkSubmitAction.KILL => kill(appArgs) case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs) case SparkSubmitAction.PRINT_VERSION => printVersion() } }
不難明白這是一個主控函數,根據接受的action類型,調用對應的處理:
l case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)---提交spark任務
l case SparkSubmitAction.KILL => kill(appArgs)---殺掉spark任務
l case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)---獲取任務狀態
l case SparkSubmitAction.PRINT_VERSION => printVersion()---打印版本信息
我們想明白spark任務提交的具體實現類,需要進入submit函數查看具體的業務:
/** * 運行包含兩步: * 第一步,我們通過設置適當的類路徑,系統屬性和應用程序參數來准備啟動環境,以便基於集群管理和部署模式運行子主類。 * 第二步,我們使用這個啟動環境來調用子主類的主方法。 * Submit the application using the provided parameters. * 使用提供的參數信息來提交application * This runs in two steps. First, we prepare the launch environment by setting up * the appropriate classpath, system properties, and application arguments for * running the child main class based on the cluster manager and the deploy mode. * Second, we use this launch environment to invoke the main method of the child * main class. */ @tailrec private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = { // 通過設置適當的類路徑,系統屬性和應用程序參數來准備啟動環境,以便基於集群管理和部署模式運行子主類。 val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args) def doRunMain(): Unit = { if (args.proxyUser != null) { val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser, UserGroupInformation.getCurrentUser()) try { proxyUser.doAs(new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose) } }) } catch { case e: Exception => // Hadoop's AuthorizationException suppresses the exception's stack trace, which // makes the message printed to the output by the JVM not very helpful. Instead, // detect exceptions with empty stack traces here, and treat them differently. if (e.getStackTrace().length == 0) { error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}") } else { throw e } } } else { runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose) } } // Let the main class re-initialize the logging system once it starts. if (uninitLog) { Logging.uninitialize() } //在獨立集群模式下,有兩個提交網關: //(1)使用o.a.s.deploy.Client作為包裝器的傳統RPC網關 //(2)Spark 1.3中引入了新的基於REST的網關 //后者是Spark 1.3的默認行為,但如果主端點不是REST服務器,則Spark Submit將故障轉移到使用舊網關。 // In standalone cluster mode, there are two submission gateways: // (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper // (2) The new REST-based gateway introduced in Spark 1.3 // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over // to use the legacy gateway if the master endpoint turns out to be not a REST server. if (args.isStandaloneCluster && args.useRest) { try { logInfo("Running Spark using the REST application submission protocol.") doRunMain() } catch { // Fail over to use the legacy submission gateway case e: SubmitRestConnectionException => logWarning(s"Master endpoint ${args.master} was not a REST server. " + "Falling back to legacy submission gateway instead.") args.useRest = false submit(args, false) } // 其他模式,只需直接運行主類 // In all other modes, just run the main class as prepared } else { doRunMain() } }
上邊submit(…)函數最后一行會調用該函數內部自定義函數doRunMain(),該函數會根據應用程序參數(args.proxyUser)做一次判斷處理:
1) 如果是代理用戶,則使用proxyUser 對runMain()函數包裝調用;
2) 如果非代理用戶,則直接調用runMain()函數。
任務運行環境准備
通過設置適當的類路徑,系統屬性和應用程序參數來准備啟動環境,以便基於集群管理和部署模式運行子主類。
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
/** * 未提交的應用程序准備環境 * Prepare the environment for submitting an application. * * @param args the parsed SparkSubmitArguments used for environment preparation. * @param conf the Hadoop Configuration, this argument will only be set in unit test. * @return a 4-tuple: * (1) the arguments for the child process, * (2) a list of classpath entries for the child, * (3) a map of system properties, and * (4) the main class for the child * 返回一個4元組(childArgs, childClasspath, sparkConf, childMainClass) * childArgs:子進程的參數 * childClasspath:子級的類路徑條目列表 * sparkConf:系統參數map集合 * childMainClass:子級的主類 * * Exposed for testing. */ private[deploy] def prepareSubmitEnvironment( args: SparkSubmitArguments, conf: Option[HadoopConfiguration] = None) : (Seq[String], Seq[String], SparkConf, String) = { // Return values val childArgs = new ArrayBuffer[String]() val childClasspath = new ArrayBuffer[String]() val sparkConf = new SparkConf() var childMainClass = "" // 設置集群管理器, // 從這個列表中可以得到信息:spark目前支持的集群管理器包含:YARN,STANDLONE,MESOS,KUBERNETES,LOCAL, // 在spark-submit參數的--master中指定。 // Set the cluster manager val clusterManager: Int = args.master match { case "yarn" => YARN case "yarn-client" | "yarn-cluster" => // spark2.0之前可以使用yarn-cleint,yarn-cluster作為--master參數,從spark2.0起,不再支持,這里默認自動轉化為yarn,並給出警告信息。 logWarning(s"Master ${args.master} is deprecated since 2.0." + " Please use master \"yarn\" with specified deploy mode instead.") YARN case m if m.startsWith("spark") => STANDALONE case m if m.startsWith("mesos") => MESOS case m if m.startsWith("k8s") => KUBERNETES case m if m.startsWith("local") => LOCAL case _ => error("Master must either be yarn or start with spark, mesos, k8s, or local") -1 } // 設置部署模式--deploy-mode,默認為client模式。 // Set the deploy mode; default is client mode var deployMode: Int = args.deployMode match { case "client" | null => CLIENT case "cluster" => CLUSTER case _ => error("Deploy mode must be either client or cluster") -1 } // 由於”yarn-cluster“和”yarn-client“方式已被棄用,因此封裝了--master和--deploy-mode。 // 如果只指定了一個--master和--deploy-mode,我們有一些邏輯來推斷它們之間的關系;如果它們不一致,我們可以提前退出。 // Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both // the master and deploy mode, we have some logic to infer the master and deploy mode // from each other if only one is specified, or exit early if they are at odds. if (clusterManager == YARN) { (args.master, args.deployMode) match { case ("yarn-cluster", null) => deployMode = CLUSTER args.master = "yarn" case ("yarn-cluster", "client") => error("Client deploy mode is not compatible with master \"yarn-cluster\"") case ("yarn-client", "cluster") => error("Cluster deploy mode is not compatible with master \"yarn-client\"") case (_, mode) => args.master = "yarn" } // 如果我們想去使用YARN的話,必須確保它包含在我們產品中。 // Make sure YARN is included in our build if we're trying to use it if (!Utils.classIsLoadable(YARN_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) { error( "Could not load YARN classes. " + "This copy of Spark may not have been compiled with YARN support.") } } if (clusterManager == KUBERNETES) { args.master = Utils.checkAndGetK8sMasterUrl(args.master) // Make sure KUBERNETES is included in our build if we're trying to use it if (!Utils.classIsLoadable(KUBERNETES_CLUSTER_SUBMIT_CLASS) && !Utils.isTesting) { error( "Could not load KUBERNETES classes. " + "This copy of Spark may not have been compiled with KUBERNETES support.") } } // 下邊的一些模式是不支持,盡早讓它們失敗。 // Fail fast, the following modes are not supported or applicable (clusterManager, deployMode) match { case (STANDALONE, CLUSTER) if args.isPython => error("Cluster deploy mode is currently not supported for python " + "applications on standalone clusters.") case (STANDALONE, CLUSTER) if args.isR => error("Cluster deploy mode is currently not supported for R " + "applications on standalone clusters.") case (LOCAL, CLUSTER) => error("Cluster deploy mode is not compatible with master \"local\"") case (_, CLUSTER) if isShell(args.primaryResource) => error("Cluster deploy mode is not applicable to Spark shells.") case (_, CLUSTER) if isSqlShell(args.mainClass) => error("Cluster deploy mode is not applicable to Spark SQL shell.") case (_, CLUSTER) if isThriftServer(args.mainClass) => error("Cluster deploy mode is not applicable to Spark Thrift server.") case _ => } // 如果args.deployMode為null的話,給它賦值更新。稍后它將作為Spark的屬性向下傳遞 // Update args.deployMode if it is null. It will be passed down as a Spark property later. (args.deployMode, deployMode) match { case (null, CLIENT) => args.deployMode = "client" case (null, CLUSTER) => args.deployMode = "cluster" case _ => } // 根據資源管理器和部署模式,進行邏輯判斷出幾種特殊運行方式。 val isYarnCluster = clusterManager == YARN && deployMode == CLUSTER val isMesosCluster = clusterManager == MESOS && deployMode == CLUSTER val isStandAloneCluster = clusterManager == STANDALONE && deployMode == CLUSTER val isKubernetesCluster = clusterManager == KUBERNETES && deployMode == CLUSTER val isMesosClient = clusterManager == MESOS && deployMode == CLIENT if (!isMesosCluster && !isStandAloneCluster) { // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files // too for packages that include Python code val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( args.packagesExclusions, args.packages, args.repositories, args.ivyRepoPath, args.ivySettingsPath) if (!StringUtils.isBlank(resolvedMavenCoordinates)) { args.jars = mergeFileLists(args.jars, resolvedMavenCoordinates) if (args.isPython || isInternal(args.primaryResource)) { args.pyFiles = mergeFileLists(args.pyFiles, resolvedMavenCoordinates) } } // install any R packages that may have been passed through --jars or --packages. // Spark Packages may contain R source code inside the jar. if (args.isR && !StringUtils.isBlank(args.jars)) { RPackageUtils.checkAndBuildRPackage(args.jars, printStream, args.verbose) } } args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) } val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf)) val targetDir = Utils.createTempDir() // assure a keytab is available from any place in a JVM if (clusterManager == YARN || clusterManager == LOCAL || isMesosClient) { if (args.principal != null) { if (args.keytab != null) { require(new File(args.keytab).exists(), s"Keytab file: ${args.keytab} does not exist") // Add keytab and principal configurations in sysProps to make them available // for later use; e.g. in spark sql, the isolated class loader used to talk // to HiveMetastore will use these settings. They will be set as Java system // properties and then loaded by SparkConf sparkConf.set(KEYTAB, args.keytab) sparkConf.set(PRINCIPAL, args.principal) UserGroupInformation.loginUserFromKeytab(args.principal, args.keytab) } } } // Resolve glob path for different resources. args.jars = Option(args.jars).map(resolveGlobPaths(_, hadoopConf)).orNull args.files = Option(args.files).map(resolveGlobPaths(_, hadoopConf)).orNull args.pyFiles = Option(args.pyFiles).map(resolveGlobPaths(_, hadoopConf)).orNull args.archives = Option(args.archives).map(resolveGlobPaths(_, hadoopConf)).orNull lazy val secMgr = new SecurityManager(sparkConf) // In client mode, download remote files. var localPrimaryResource: String = null var localJars: String = null var localPyFiles: String = null if (deployMode == CLIENT) { localPrimaryResource = Option(args.primaryResource).map { downloadFile(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull localJars = Option(args.jars).map { downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull localPyFiles = Option(args.pyFiles).map { downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr) }.orNull } // When running in YARN, for some remote resources with scheme: // 1. Hadoop FileSystem doesn't support them. // 2. We explicitly bypass Hadoop FileSystem with "spark.yarn.dist.forceDownloadSchemes". // We will download them to local disk prior to add to YARN's distributed cache. // For yarn client mode, since we already download them with above code, so we only need to // figure out the local path and replace the remote one. if (clusterManager == YARN) { val forceDownloadSchemes = sparkConf.get(FORCE_DOWNLOAD_SCHEMES) def shouldDownload(scheme: String): Boolean = { forceDownloadSchemes.contains("*") || forceDownloadSchemes.contains(scheme) || Try { FileSystem.getFileSystemClass(scheme, hadoopConf) }.isFailure } def downloadResource(resource: String): String = { val uri = Utils.resolveURI(resource) uri.getScheme match { case "local" | "file" => resource case e if shouldDownload(e) => val file = new File(targetDir, new Path(uri).getName) if (file.exists()) { file.toURI.toString } else { downloadFile(resource, targetDir, sparkConf, hadoopConf, secMgr) } case _ => uri.toString } } args.primaryResource = Option(args.primaryResource).map { downloadResource }.orNull args.files = Option(args.files).map { files => Utils.stringToSeq(files).map(downloadResource).mkString(",") }.orNull args.pyFiles = Option(args.pyFiles).map { pyFiles => Utils.stringToSeq(pyFiles).map(downloadResource).mkString(",") }.orNull args.jars = Option(args.jars).map { jars => Utils.stringToSeq(jars).map(downloadResource).mkString(",") }.orNull args.archives = Option(args.archives).map { archives => Utils.stringToSeq(archives).map(downloadResource).mkString(",") }.orNull } // If we're running a python app, set the main class to our specific python runner 。。。。 // In YARN mode for an R app, add the SparkR package archive and the R package // archive containing all of the built R libraries to archives so that they can // be distributed with the job 。。。。 // TODO: Support distributing R packages with standalone cluster 。。。。 // TODO: Support distributing R packages with mesos cluster 。。。。 // If we're running an R app, set the main class to our specific R runner 。。。。 // Special flag to avoid deprecation warnings at the client sys.props("SPARK_SUBMIT") = "true" // In client mode, launch the application main class directly // In addition, add the main application jar and any added jars (if any) to the classpath if (deployMode == CLIENT) { childMainClass = args.mainClass if (localPrimaryResource != null && isUserJar(localPrimaryResource)) { childClasspath += localPrimaryResource } if (localJars != null) { childClasspath ++= localJars.split(",") } } // Add the main application jar and any added jars to classpath in case YARN client // requires these jars. // This assumes both primaryResource and user jars are local jars, or already downloaded // to local by configuring "spark.yarn.dist.forceDownloadSchemes", otherwise it will not be // added to the classpath of YARN client. if (isYarnCluster) { if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } if (args.jars != null) { childClasspath ++= args.jars.split(",") } } if (deployMode == CLIENT) { if (args.childArgs != null) { childArgs ++= args.childArgs } } // Map all arguments to command-line options or system properties for our chosen mode for (opt <- options) { if (opt.value != null && (deployMode & opt.deployMode) != 0 && (clusterManager & opt.clusterManager) != 0) { if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } if (opt.confKey != null) { sparkConf.set(opt.confKey, opt.value) } } } // In case of shells, spark.ui.showConsoleProgress can be true by default or by user. if (isShell(args.primaryResource) && !sparkConf.contains(UI_SHOW_CONSOLE_PROGRESS)) { sparkConf.set(UI_SHOW_CONSOLE_PROGRESS, true) } // Let YARN know it's a pyspark app, so it distributes needed libraries. if (clusterManager == YARN) { if (args.isPython) { sparkConf.set("spark.yarn.isPython", "true") } } // In yarn-cluster mode, use yarn.Client as a wrapper around the user class if (isYarnCluster) { childMainClass = YARN_CLUSTER_SUBMIT_CLASS if (args.isPython) { childArgs += ("--primary-py-file", args.primaryResource) childArgs += ("--class", "org.apache.spark.deploy.PythonRunner") } else if (args.isR) { val mainFile = new Path(args.primaryResource).getName childArgs += ("--primary-r-file", mainFile) childArgs += ("--class", "org.apache.spark.deploy.RRunner") } else { if (args.primaryResource != SparkLauncher.NO_RESOURCE) { childArgs += ("--jar", args.primaryResource) } childArgs += ("--class", args.mainClass) } if (args.childArgs != null) { args.childArgs.foreach { arg => childArgs += ("--arg", arg) } } } // Load any properties specified through --conf and the default properties file for ((k, v) <- args.sparkProperties) { sparkConf.setIfMissing(k, v) } // Ignore invalid spark.driver.host in cluster modes. if (deployMode == CLUSTER) { sparkConf.remove("spark.driver.host") } // Resolve paths in certain spark properties val pathConfigs = Seq( "spark.jars", "spark.files", "spark.yarn.dist.files", "spark.yarn.dist.archives", "spark.yarn.dist.jars") pathConfigs.foreach { config => // Replace old URIs with resolved URIs, if they exist sparkConf.getOption(config).foreach { oldValue => sparkConf.set(config, Utils.resolveURIs(oldValue)) } } // Resolve and format python file paths properly before adding them to the PYTHONPATH. // The resolving part is redundant in the case of --py-files, but necessary if the user // explicitly sets `spark.submit.pyFiles` in his/her default properties file. sparkConf.getOption("spark.submit.pyFiles").foreach { pyFiles => val resolvedPyFiles = Utils.resolveURIs(pyFiles) val formattedPyFiles = if (!isYarnCluster && !isMesosCluster) { PythonRunner.formatPaths(resolvedPyFiles).mkString(",") } else { // Ignoring formatting python path in yarn and mesos cluster mode, these two modes // support dealing with remote python files, they could distribute and add python files // locally. resolvedPyFiles } sparkConf.set("spark.submit.pyFiles", formattedPyFiles) } (childArgs, childClasspath, sparkConf, childMainClass) }
准備Yarn(Cluster Manager)的執行類:
使用spark-submit(https://github.com/apache/spark/blob/branch-2.4/bin/spark-submit)啟動時,實際上執行的是exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
在SparkSubmit中
private[deploy] def prepareSubmitEnvironment(args: SparkSubmitArguments,conf: Option[HadoopConfiguration] = None): (Seq[String], Seq[String], SparkConf, String)
方法中會為spark提交做准備,准備好運行環境相關。
其中這方法內部代碼中,發現當cluster manager為yarn時:
1)當--deploy-mode:cluster時
會調用YarnClusterApplication進行提交
YarnClusterApplication這是org.apache.spark.deploy.yarn.Client中的一個內部類,在YarnClusterApplication中new了一個Client對象,並調用了run方法
private[spark] class YarnClusterApplication extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkConf here for yarn mode. conf.remove("spark.jars") conf.remove("spark.files") new Client(new ClientArguments(args), conf).run() } }
2)當--deploy-mode:client時
調用application-jar.jar自身main函數,執行的是JavaMainApplication
/** * Entry point for a Spark application. Implementations must provide a no-argument constructor. */ private[spark] trait SparkApplication { def start(args: Array[String], conf: SparkConf): Unit } /** * Implementation of SparkApplication that wraps a standard Java class with a "main" method. * * Configuration is propagated to the application via system properties, so running multiple * of these in the same JVM may lead to undefined behavior due to configuration leaks. */ private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val mainMethod = klass.getMethod("main", new Array[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } val sysProps = conf.getAll.toMap sysProps.foreach { case (k, v) => sys.props(k) = v } mainMethod.invoke(null, args) } }
從JavaMainApplication實現可以發現,JavaSparkApplication中調用start方法時,只是通過反射執行application-jar.jar的main函數。
提交到Yarn
yarn-cluster運行流程:
當yarn-custer模式中,YarnClusterApplication類中運行的是Client中run方法,Client#run()中實現了任務提交流程:
/** * Submit an application to the ResourceManager. * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive * reporting the application's status until the application has exited for any reason. * Otherwise, the client process will exit after submission. * If the application finishes with a failed, killed, or undefined status, * throw an appropriate SparkException. */ def run(): Unit = { this.appId = submitApplication() if (!launcherBackend.isConnected() && fireAndForget) { val report = getApplicationReport(appId) val state = report.getYarnApplicationState logInfo(s"Application report for $appId (state: $state)") logInfo(formatReportDetails(report)) if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { throw new SparkException(s"Application $appId finished with status: $state") } } else { val YarnAppReport(appState, finalState, diags) = monitorApplication(appId) if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) { diags.foreach { err => logError(s"Application diagnostics message: $err") } throw new SparkException(s"Application $appId finished with failed status") } if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) { throw new SparkException(s"Application $appId is killed") } if (finalState == FinalApplicationStatus.UNDEFINED) { throw new SparkException(s"The final status of application $appId is undefined") } } }
在Client類的run()方法中會調用submitApplication()方法,該方法實現:
/** * Submit an application running our ApplicationMaster to the ResourceManager. * * The stable Yarn API provides a convenience method (YarnClient#createApplication) for * creating applications and setting up the application submission context. This was not * available in the alpha API. */ def submitApplication(): ApplicationId = { var appId: ApplicationId = null try { launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start() logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) // Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT), Option(appId.toString)).setCurrentContext() // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) // Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application logInfo(s"Submitting application $appId to ResourceManager") yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) appId } catch { case e: Throwable => if (appId != null) { cleanupStagingDir(appId) } throw e } }
run()方法則是實現向yarn中的ResourceManager(后文全部簡稱RM)提交運行任務,並運行我們的ApplicationMaster(后文簡稱AM)。
穩定的Yarn API提供了一種方便的方法(YarnClient#createApplication),用於創建應用程序和設置應用程序提交上下文。
submitApplication()方法具體操作步驟:
l 初始化並啟動YarnClient,后邊將使用yarnClient提供的各種API
l 通過調用yarnClient#createApplication()方法,從RM獲取一個newApp(application),該newApp用於運行AM。通過newApp#getNewApplicationResponse()返回newApp需要資源情況(newAppResponse)。
l 通過newAppResponse驗證集群是否有足夠的資源來運行AM。
l 設置適當的上下文來以啟動AM。
l 調用yarnClient#submitApplication(appContext)向yarn提交任務啟動的請求,並監控application。
yarn-client運行流程:
- 對於部署方式是Client的情況,SparkSubmit的main函數中通過反射執行應用程序的main方法
- 在應用程序的main方法中,創建SparkContext實例
- 在創建SparkContext的實例過程中,通過如下語句創建Scheduler和Backend實例
private var _schedulerBackend: SchedulerBackend = _ private var _taskScheduler: TaskScheduler = _ private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend private[spark] def taskScheduler: TaskScheduler = _taskScheduler private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = { _taskScheduler = ts } // 構造函數中初始化賦值 // Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts
https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/SparkContext.scala
SparkContext初始化過程
在Yarn模式下,SparkContext初始化位置因--deploy-mode不同而不同:
yarn-cluster模式下:client會先申請向RM(Yarn Resource Manager)一個Container,來啟動AM(ApplicationMaster)進程,而SparkContext運行在AM(ApplicationMaster)進程中;
yarn-client模式下 :在提交節點上執行SparkContext初始化,由client類(JavaMainApplication)調用。
/** * Create a task scheduler based on a given master URL. * Return a 2-tuple of the scheduler backend and the task scheduler. */ private def createTaskScheduler(。。。): (SchedulerBackend, TaskScheduler) = { 。。。 master match { case "local" => 。。。 case LOCAL_N_REGEX(threads) => 。。。 case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => 。。。。 case SPARK_REGEX(sparkUrl) => 。。。。 case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => 。。。。 case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) } } } private def getClusterManager(url: String): Option[ExternalClusterManager] = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = ServiceLoader.load(classOf[ExternalClusterManager], loader).asScala.filter(_.canCreate(url)) if (serviceLoaders.size > 1) { throw new SparkException( s"Multiple external cluster managers registered for the url $url: $serviceLoaders") } serviceLoaders.headOption }
1)SparkContext#createTaskScheduler(。。。)
根據不同的資源管理方式cluster manager來創建不同的TaskScheduler,SchedulerBackend。
1.1)SchedulerBackend與cluster manager資源管理器交互取得應用被分配的資源。
1.2)TaskSheduler在不同的job之間調度,同時接收被分配的資源,之后由他來給每一個Task分配資源。
2)SparkContext#createTaskScheduler(。。。)
最后一個match case是對其他資源管理方式(除了local和standelone{spark://}外的mesos,yarn,kubernetes【外部資源管理器】的資源管理方式)的處理。
SparkContext#createTaskScheduler(。。。)#master match#case masterUrl下邊調用了getClusterManager(masterUrl)方法,該方法返回對象是實現了ExternalClusterManager接口的YarnClusterManager類對象。
備注:實現了ExternalClusterManager接口的類還包含:
MesosClusterManager (https://github.com/apache/spark/blob/branch-2.4/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterManager.scala)
KubernetesClusterManager (https://github.com/apache/spark/blob/branch-2.4/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterManager.scala)
ExternalClusterManager接口定義:
private[spark] trait ExternalClusterManager { def canCreate(masterURL: String): Boolean def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit }
ExternalClusterManager接口提供了4個方法:
-canCreate(masterURL: String):Boolean 檢查此群集管理器實例是否可以為某個masterURL創建scheduler組件。
-createTaskScheduler(sc: SparkContext, masterURL: String):TaskScheduler 為給定的SparkContext創建TaskScheduler實例
-createSchedulerBackend(sc: SparkContext,masterURL: String,scheduler: TaskScheduler): SchedulerBackend 為給定的SparkContext和調度程序創建SchedulerBackend 。這是在使用“ExternalClusterManager.createTaskScheduler()”創建TaskScheduler后調用的。
-initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit 初始化TaskScheduler和SchedulerBackend,在創建調度程序組件之后調用。
YarnClusterManager類定義:
private[spark] class YarnClusterManager extends ExternalClusterManager { override def canCreate(masterURL: String): Boolean = { masterURL == "yarn" } override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { sc.deployMode match { case "cluster" => new YarnClusterScheduler(sc) case "client" => new YarnScheduler(sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } } override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { sc.deployMode match { case "cluster" => new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case "client" => new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) } }
YarnClusterManager#createTaskScheduler(...):
在該方法中會根據SparkContext對象的deployMode屬性來進行分支判斷:
client時,返回YarnScheduler(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala)實例對象;
cluster時,返回YarnClusterScheduler(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala)實例對象。
YarnClusterManager#createSchedulerBackend(...):
在該方法中會根據SparkContext對象的deployMode屬性來進行分支判斷:
client時,返回YarnClientSchedulerBackend(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala)實例對象;
cluster時,返回YarnClusterSchedulerBackend(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala)實例對象。
Yarn作業運行運行架構原理解析:
1、分析Spark on YARN的Cluster模式,從用戶提交作業到作業運行結束整個運行期間的過程分析。
客戶端進行操作
- 1、根據yarnConf來初始化yarnClient,並啟動yarnClient
- 2、創建客戶端Application,並獲取Application的ID,進一步判斷集群中的資源是否滿足executor和ApplicationMaster申請的資源,如果不滿足則拋出IllegalArgumentException;
- 3、設置資源、環境變量:其中包括了設置Application的Staging目錄、准備本地資源(jar文件、log4j.properties)、設置Application其中的環境變量、創建Container啟動的Context等;
- 4、設置Application提交的Context,包括設置應用的名字、隊列、AM的申請的Container、標記該作業的類型為Spark;
- 5、申請Memory,並最終通過yarnClient.submitApplication向ResourceManager提交該Application。
當作業提交到YARN上之后,客戶端就沒事了,甚至在終端關掉那個進程也沒事,因為整個作業運行在YARN集群上進行,運行的結果將會保存到HDFS或者日志中。
提交到YARN集群,YARN操作
- 1、運行ApplicationMaster的run方法;
- 2、設置好相關的環境變量。
- 3、創建amClient,並啟動;
- 4、在Spark UI啟動之前設置Spark UI的AmIpFilter;
- 5、在startUserClass函數專門啟動了一個線程(名稱為Driver的線程)來啟動用戶提交的Application,也就是啟動了Driver。在Driver中將會初始化SparkContext;
- 6、等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次數(默認為10),如果等待了的次數超過了配置的,程序將會退出;否則用SparkContext初始化yarnAllocator;
怎么知道SparkContext初始化完成?
其實在5步驟中啟動Application的過程中會初始化SparkContext,在初始化SparkContext的時候將會創建YarnClusterScheduler,在SparkContext初始化完成的時候,會調用YarnClusterScheduler類中的postStartHook方法,而該方法會通知ApplicationMaster已經初始化好了SparkContext
- 7、當SparkContext、Driver初始化完成的時候,通過amClient向ResourceManager注冊ApplicationMaster
- 8、分配並啟動Executeors。在啟動Executeors之前,先要通過yarnAllocator獲取到numExecutors個Container,然后在Container中啟動Executeors。如果在啟動Executeors的過程中失敗的次數達到了maxNumExecutorFailures的次數,maxNumExecutorFailures的計算規則如下:
// Default to numExecutors * 2, with minimum of 3 private val maxNumExecutorFailures =sparkConf.getInt("spark.yarn.max.executor.failures", sparkConf.getInt("spark.yarn.max.worker.failures", math.max(args.numExecutors *2,3)))
那么這個Application將失敗,將Application Status標明為FAILED,並將關閉SparkContext。其實,啟動Executeors是通過ExecutorRunnable實現的,而ExecutorRunnable內部是啟動CoarseGrainedExecutorBackend的。
- 9、最后,Task將在CoarseGrainedExecutorBackend里面運行,然后運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業運行完成。
2、Spark on YARN client 模式作業運行全過程分析
我們知道Spark on yarn有兩種模式:yarn-cluster和yarn-client。這兩種模式作業雖然都是在yarn上面運行,但是其中的運行方式很不一樣,今天我就來談談Spark on YARN yarn-client模式作業從提交到運行的過程剖析。
和yarn-cluster模式一樣,整個程序也是通過spark-submit腳本提交的。但是yarn-client作業程序的運行不需要通過Client類來封裝啟動,而是直接通過反射機制調用作業的main函數。下面就來分析:
- 1、通過SparkSubmit類的launch的函數直接調用作業的main函數(通過反射機制實現),如果是集群模式就會調用Client的main函數。
- 2、而應用程序的main函數一定都有個SparkContent,並對其進行初始化;
- 3、在SparkContent初始化中將會依次做如下的事情:設置相關的配置、注冊MapOutputTracker、BlockManagerMaster、BlockManager,創建taskScheduler和dagScheduler;其中比較重要的是創建taskScheduler和dagScheduler。在創建taskScheduler的時候會根據我們傳進來的master來選擇Scheduler和SchedulerBackend。由於我們選擇的是yarn-client模式,程序會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend,並將YarnClientSchedulerBackend的實例初始化YarnClientClusterScheduler,上面兩個實例的獲取都是通過反射機制實現的,YarnClientSchedulerBackend類是CoarseGrainedSchedulerBackend類的子類,YarnClientClusterScheduler是TaskSchedulerImpl的子類,僅僅重寫了TaskSchedulerImpl中的getRackForHost方法。
- 4、初始化完taskScheduler后,將創建dagScheduler,然后通過taskScheduler.start()啟動taskScheduler,而在taskScheduler啟動的過程中也會調用SchedulerBackend的start方法。在SchedulerBackend啟動的過程中將會初始化一些參數,封裝在ClientArguments中,並將封裝好的ClientArguments傳進Client類中,並client.runApp()方法獲取Application ID。
- 5、client.runApp里面的做是和前面客戶端進行操作那節類似,不同的是在里面啟動是ExecutorLauncher(yarn-cluster模式啟動的是ApplicationMaster)。
- 6、在ExecutorLauncher里面會初始化並啟動amClient,然后向ApplicationMaster注冊該Application。注冊完之后將會等待driver的啟動,當driver啟動完之后,會創建一個MonitorActor對象用於和CoarseGrainedSchedulerBackend進行通信(只有事件AddWebUIFilter他們之間才通信,Task的運行狀況不是通過它和CoarseGrainedSchedulerBackend通信的)。然后就是設置addAmIpFilter,當作業完成的時候,ExecutorLauncher將通過amClient設置Application的狀態為FinalApplicationStatus.SUCCEEDED。
- 7、分配Executors,這里面的分配邏輯和yarn-cluster里面類似,就不再說了。
- 8、最后,Task將在CoarseGrainedExecutorBackend里面運行,然后運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業運行完成。
- 9、在作業運行的時候,YarnClientSchedulerBackend會每隔1秒通過client獲取到作業的運行狀況,並打印出相應的運行信息,當Application的狀態是FINISHED、FAILED和KILLED中的一種,那么程序將退出等待。
- 10、最后有個線程會再次確認Application的狀態,當Application的狀態是FINISHED、FAILED和KILLED中的一種,程序就運行完成,並停止SparkContext。整個過程就結束了。
YARN-Cluster運行架構原理
在YARN-Cluster模式中,當用戶向YARN中提交一個應用程序后,YARN將分兩個階段運行該應用程序:
- 1.第一個階段是把Spark的Driver作為一個ApplicationMaster在YARN集群中先啟動;
- 2.第二個階段是由ApplicationMaster創建應用程序,然后為它向ResourceManager申請資源,並啟動Executor來運行Task,同時監控它的整個運行過程,直到運行完成
說明如下:
- Spark Yarn Client向YARN中提交應用程序,包括ApplicationMaster程序、啟動ApplicationMaster的命令、需要在Executor中運行的程序等;
- ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,其中ApplicationMaster進行SparkContext等的初始化;
- ApplicationMaster向ResourceManager注冊,這樣用戶可以直接通過ResourceManage查看應用程序的運行狀態,然后它將采用輪詢的方式通過RPC協議為各個任務申請資源,並監控它們的運行狀態直到運行結束;
- 一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向ApplicationMaster中的SparkContext注冊並申請Task。這一點和Standalone模式一樣,只不過SparkContext在Spark Application中初始化時,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進行任務的調度,其中YarnClusterScheduler只是對TaskSchedulerImpl的一個簡單包裝,增加了對Executor的等待邏輯等;
- ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向ApplicationMaster匯報運行的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;
- 應用程序運行完成后,ApplicationMaster向ResourceManager申請注銷並關閉自己;
跟蹤CoarseGrainedExecutorBackend啟動腳本:

1 [root@CDH-143 bin]$ yarn applicationattempt -list application_1559203334026_0010 2 19/05/31 09:36:10 INFO client.RMProxy: Connecting to ResourceManager at CDH-143/10.132.52.143:8032 3 Total number of application attempts :1 4 ApplicationAttempt-Id State AM-Container-Id Tracking-URL 5 appattempt_1559203334026_0010_000001 RUNNING container_1559203334026_0010_01_000001 http://CDH-143:8088/proxy/application_1559203334026_0010/ 6 7 [root@CDH-143 bin]$ yarn container -list appattempt_1559203334026_0010_000001 8 19/05/31 09:36:51 INFO client.RMProxy: Connecting to ResourceManager at CDH-143/10.132.52.143:8032 9 Total number of containers :16 10 Container-Id Start Time Finish Time State Host LOG-URL 11 container_1559203334026_0010_01_000015 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-146:8041 http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000015/dx 12 container_1559203334026_0010_01_000016 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-146:8041 http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000016/dx 13 container_1559203334026_0010_01_000003 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-141:8041 http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000003/dx 14 container_1559203334026_0010_01_000004 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-141:8041 http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000004/dx 15 container_1559203334026_0010_01_000005 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-141:8041 http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000005/dx 16 container_1559203334026_0010_01_000006 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-141:8041 http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000006/dx 17 container_1559203334026_0010_01_000001 Thu May 30 19:52:06 +0800 2019 N/A RUNNING CDH-142:8041 http://CDH-142:8042/node/containerlogs/container_1559203334026_0010_01_000001/dx 18 container_1559203334026_0010_01_000002 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-141:8041 http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000002/dx 19 container_1559203334026_0010_01_000011 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-146:8041 http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000011/dx 20 container_1559203334026_0010_01_000012 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-146:8041 http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000012/dx 21 container_1559203334026_0010_01_000013 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-146:8041 http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000013/dx 22 container_1559203334026_0010_01_000014 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-146:8041 http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000014/dx 23 container_1559203334026_0010_01_000007 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-141:8041 http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000007/dx 24 container_1559203334026_0010_01_000008 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-141:8041 http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000008/dx 25 container_1559203334026_0010_01_000009 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-141:8041 http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000009/dx 26 container_1559203334026_0010_01_000010 Thu May 30 19:52:19 +0800 2019 N/A RUNNING CDH-146:8041 http://CDH-146:8042/node/containerlogs/container_1559203334026_0010_01_000010/dx 27 28 [root@CDH-141 ~]$ ps axu | grep container_1559203334026_0010_01_000003 29 yarn 30557 0.0 0.0 113144 1496 ? S May30 0:00 bash 30 /data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/default_container_executor.sh 31 yarn 30569 0.0 0.0 113280 1520 ? Ss May30 0:00 /bin/bash -c /usr/java/jdk1.8.0_171-amd64/bin/java 32 -server -Xmx6144m 33 -Djava.io.tmpdir=/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/tmp 34 '-Dspark.driver.port=50365' 35 '-Dspark.network.timeout=10000000' 36 '-Dspark.port.maxRetries=32' 37 -Dspark.yarn.app.container.log.dir=/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003 38 -XX:OnOutOfMemoryError='kill %p' 39 org.apache.spark.executor.CoarseGrainedExecutorBackend 40 --driver-url spark://CoarseGrainedScheduler@CDH-143:50365 41 --executor-id 2 42 --hostname CDH-141 43 --cores 2 44 --app-id application_1559203334026_0010 45 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/__app__.jar 46 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx-domain-perf-3.0.0.jar 47 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx-common-3.0.0.jar 48 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/spark-sql-kafka-0-10_2.11-2.4.0.jar 49 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/spark-avro_2.11-3.2.0.jar 50 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar 51 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/rocksdbjni-5.17.2.jar 52 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/kafka-clients-0.10.0.1.jar 53 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/elasticsearch-spark-20_2.11-6.4.1.jar 54 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx_Spark_State_Store_Plugin-1.0-SNAPSHOT.jar 55 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/bijection-core_2.11-0.9.5.jar 56 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/bijection-avro_2.11-0.9.5.jar 57 1>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stdout 58 2>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stderr 59 yarn 30700 161 5.3 8738480 7032916 ? Sl May30 1392:01 /usr/java/jdk1.8.0_171-amd64/bin/java 60 -server -Xmx6144m 61 -Djava.io.tmpdir=/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/tmp 62 -Dspark.driver.port=50365 63 -Dspark.network.timeout=10000000 64 -Dspark.port.maxRetries=32 65 -Dspark.yarn.app.container.log.dir=/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003 66 -XX:OnOutOfMemoryError=kill %p 67 org.apache.spark.executor.CoarseGrainedExecutorBackend 68 --driver-url spark://CoarseGrainedScheduler@CDH-143:50365 69 --executor-id 2 70 --hostname CDH-141 71 --cores 2 72 --app-id application_1559203334026_0010 73 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/__app__.jar 74 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx-domain-perf-3.0.0.jar 75 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx-common-3.0.0.jar 76 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/spark-sql-kafka-0-10_2.11-2.4.0.jar 77 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/spark-avro_2.11-3.2.0.jar 78 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar 79 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/rocksdbjni-5.17.2.jar 80 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/kafka-clients-0.10.0.1.jar 81 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/elasticsearch-spark-20_2.11-6.4.1.jar 82 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/dx_Spark_State_Store_Plugin-1.0-SNAPSHOT.jar 83 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/bijection-core_2.11-0.9.5.jar 84 --user-class-path file:/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/bijection-avro_2.11-0.9.5.jar 85 dx 37775 0.0 0.0 112780 952 pts/1 S+ 10:14 0:00 grep --color=auto container_1559203334026_0010_01_000003 86 87 88 [root@CDH-141 dx]# more /data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/default_container_executor.sh 89 #!/bin/bash 90 /bin/bash "/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/default_container_executor_session.sh" 91 rc=$? 92 echo $rc > "/data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid.exitcode.tmp" 93 /bin/mv -f "/data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid.exitcode.tmp" 94 "/data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid.exitcode" 95 exit $rc 96 97 [root@CDH-141 dx]# more /data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/default_container_executor_session.sh 98 #!/bin/bash 99 100 echo $$ > /data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid.tmp 101 /bin/mv -f /data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid.tmp 102 /data6/yarn/nm/nmPrivate/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_1559203334026_0010_01_000003.pid 103 exec setsid /bin/bash "/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/launch_container.sh" 104 105 106 [root@CDH-141 dx]# more /data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/launch_container.sh 107 #!/bin/bash 108 109 export SPARK_YARN_STAGING_DIR="hdfs://CDH-143:8020/user/dx/.sparkStaging/application_1559203334026_0010" 110 export HADOOP_CONF_DIR="/run/cloudera-scm-agent/process/2037-yarn-NODEMANAGER" 111 export JAVA_HOME="/usr/java/jdk1.8.0_171-amd64" 112 export SPARK_LOG_URL_STDOUT="http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000003/dx/stdout?start=-4096" 113 export NM_HOST="CDH-141" 114 export HADOOP_HDFS_HOME="/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-hdfs" 115 export LOGNAME="dx" 116 export JVM_PID="$$" 117 export PWD="/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003" 118 export HADOOP_COMMON_HOME="/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop" 119 export LOCAL_DIRS="/data1/yarn/nm/usercache/dx/appcache/application_1559203334026_0010,/data2/yarn/nm/usercache/dx/appcache/application_1559203334026_0010,/data3/ya 120 rn/nm/usercache/dx/appcache/application_1559203334026_0010,/data4/yarn/nm/usercache/dx/appcache/application_1559203334026_0010,/data5/yarn/nm/usercache/dx/appcach 121 e/application_1559203334026_0010,/data6/yarn/nm/usercache/dx/appcache/application_1559203334026_0010,/opt/yarn/nm/usercache/dx/appcache/application_1559203334026_00 122 10" 123 export NM_HTTP_PORT="8042" 124 export SPARK_DIST_CLASSPATH="/opt/cloudera/parcels/SPARK2-2.4.0.cloudera1-1.cdh5.13.3.p0.1007356/lib/spark2/kafka-0.10/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/jars/xmlenc-0.52 125 .jar:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/jars/*.jar:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/LICENSE.txt:/op 126 t/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop/NOTICE.txt" 127 export LOG_DIRS="/data1/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003,/data2/yarn/container-logs/application_1559203334026_0 128 010/container_1559203334026_0010_01_000003,/data3/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003,/data4/yarn/container-logs/a 129 pplication_1559203334026_0010/container_1559203334026_0010_01_000003,/data5/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003,/d 130 ata6/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003,/opt/yarn/container-logs/application_1559203334026_0010/container_1559203 131 334026_0010_01_000003" 132 export NM_AUX_SERVICE_mapreduce_shuffle="AAA0+gAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA= 133 " 134 export NM_PORT="8041" 135 export USER="dx" 136 export HADOOP_YARN_HOME="/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/lib/hadoop-yarn" 137 export CLASSPATH="$PWD:$PWD/__spark_conf__:$PWD/__spark_libs__/*:$HADOOP_CLIENT_CONF_DIR:$HADOOP_CONF_DIR:$HADOOP_COMMON_HOME/*:$HADOOP_COMMON_HOME/lib/*:$HADOOP_HDFS_H 138 OME/*:$HADOOP_HDFS_HOME/lib/*:$HADOOP_YARN_HOME/*:$HADOOP_YARN_HOME/lib/*:$HADOOP_MAPRED_HOME/*:$HADOOP_MAPRED_HOME/lib/*:$MR2_CLASSPATH:/opt/cloudera/parcels/SPARK2-2. 139 4.0.cloudera1-1.cdh5.13.3.p0.1007356/lib/spark2/kafka-0.10/*:/opt/cloudera/parcels/CDH-5.13.0-1.cdh5.13.0.p0.29/jars/*.jar 140 OTICE.txt:$PWD/__spark_conf__/__hadoop_conf__" 141 export HADOOP_TOKEN_FILE_LOCATION="/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/container_tokens" 142 export NM_AUX_SERVICE_spark_shuffle="" 143 export SPARK_USER="dx" 144 export SPARK_LOG_URL_STDERR="http://CDH-141:8042/node/containerlogs/container_1559203334026_0010_01_000003/dx/stderr?start=-4096" 145 export HOME="/home/" 146 export CONTAINER_ID="container_1559203334026_0010_01_000003" 147 export MALLOC_ARENA_MAX="4" 148 ln -sf "/data5/yarn/nm/usercache/dx/filecache/1427931/kafka-clients-0.10.0.1.jar" "kafka-clients-0.10.0.1.jar" 149 hadoop_shell_errorcode=$? 150 if [ $hadoop_shell_errorcode -ne 0 ] 151 then 152 exit $hadoop_shell_errorcode 153 fi 154 ln -sf "/data6/yarn/nm/usercache/dx/filecache/1427932/elasticsearch-spark-20_2.11-6.4.1.jar" "elasticsearch-spark-20_2.11-6.4.1.jar" 155 hadoop_shell_errorcode=$? 156 if [ $hadoop_shell_errorcode -ne 0 ] 157 then 158 exit $hadoop_shell_errorcode 159 fi 160 ln -sf "/opt/yarn/nm/usercache/dx/filecache/1427933/__spark_libs__3031377885391114478.zip" "__spark_libs__" 161 hadoop_shell_errorcode=$? 162 if [ $hadoop_shell_errorcode -ne 0 ] 163 then 164 exit $hadoop_shell_errorcode 165 fi 166 ln -sf "/data6/yarn/nm/usercache/dx/filecache/1427925/dx_Spark_State_Store_Plugin-1.0-SNAPSHOT.jar" "dx_Spark_State_Store_Plugin-1.0-SNAPSHOT.jar" 167 hadoop_shell_errorcode=$? 168 if [ $hadoop_shell_errorcode -ne 0 ] 169 then 170 exit $hadoop_shell_errorcode 171 fi 172 ln -sf "/data3/yarn/nm/usercache/dx/filecache/1427929/spark-sql-kafka-0-10_2.11-2.4.0.jar" "spark-sql-kafka-0-10_2.11-2.4.0.jar" 173 hadoop_shell_errorcode=$? 174 if [ $hadoop_shell_errorcode -ne 0 ] 175 then 176 exit $hadoop_shell_errorcode 177 fi 178 ln -sf "/data4/yarn/nm/usercache/dx/filecache/1427923/streaming-common-3.0.0.jar" "streaming-common-3.0.0.jar" 179 hadoop_shell_errorcode=$? 180 if [ $hadoop_shell_errorcode -ne 0 ] 181 then 182 exit $hadoop_shell_errorcode 183 fi 184 ln -sf "/data1/yarn/nm/usercache/dx/filecache/1427934/spark-avro_2.11-3.2.0.jar" "spark-avro_2.11-3.2.0.jar" 185 hadoop_shell_errorcode=$? 186 if [ $hadoop_shell_errorcode -ne 0 ] 187 then 188 exit $hadoop_shell_errorcode 189 fi 190 ln -sf "/data2/yarn/nm/usercache/dx/filecache/1427928/bijection-avro_2.11-0.9.5.jar" "bijection-avro_2.11-0.9.5.jar" 191 hadoop_shell_errorcode=$? 192 if [ $hadoop_shell_errorcode -ne 0 ] 193 then 194 exit $hadoop_shell_errorcode 195 fi 196 ln -sf "/data2/yarn/nm/usercache/dx/filecache/1427935/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar" "shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar" 197 hadoop_shell_errorcode=$? 198 if [ $hadoop_shell_errorcode -ne 0 ] 199 then 200 exit $hadoop_shell_errorcode 201 fi 202 ln -sf "/data1/yarn/nm/usercache/dx/filecache/1427927/bijection-core_2.11-0.9.5.jar" "bijection-core_2.11-0.9.5.jar" 203 hadoop_shell_errorcode=$? 204 if [ $hadoop_shell_errorcode -ne 0 ] 205 then 206 exit $hadoop_shell_errorcode 207 fi 208 ln -sf "/data5/yarn/nm/usercache/dx/filecache/1427924/rocksdbjni-5.17.2.jar" "rocksdbjni-5.17.2.jar" 209 hadoop_shell_errorcode=$? 210 if [ $hadoop_shell_errorcode -ne 0 ] 211 then 212 exit $hadoop_shell_errorcode 213 fi 214 ln -sf "/opt/yarn/nm/usercache/dx/filecache/1427926/__spark_conf__.zip" "__spark_conf__" 215 hadoop_shell_errorcode=$? 216 if [ $hadoop_shell_errorcode -ne 0 ] 217 then 218 exit $hadoop_shell_errorcode 219 fi 220 ln -sf "/data4/yarn/nm/usercache/dx/filecache/1427930/dx-domain-perf-3.0.0.jar" "dx-domain-perf-3.0.0.jar" 221 hadoop_shell_errorcode=$? 222 if [ $hadoop_shell_errorcode -ne 0 ] 223 then 224 exit $hadoop_shell_errorcode 225 fi 226 exec /bin/bash -c "$JAVA_HOME/bin/java -server -Xmx6144m -Djava.io.tmpdir=$PWD/tmp 227 '-Dspark.driver.port=50365' 228 '-Dspark.network.timeout=10000000' 229 '-Dspark.port.maxRetries=32' 230 -Dspark.yarn.app.container.log.dir=/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003 231 -XX:OnOutOfMemoryError='kill %p' 232 org.apache.spark.executor.CoarseGrainedExecutorBackend 233 --driver-url spark://CoarseGrainedScheduler@CDH-143:50365 234 --executor-id 2 235 --hostname CDH-141 236 --cores 2 237 --app-id application_1559203334026_0010 238 --user-class-path file:$PWD/__app__.jar 239 --user-class-path file:$PWD/dx-domain-perf-3.0.0.jar 240 --user-class-path file:$PWD/streaming-common-3.0.0.jar 241 --user-class-path file:$PWD/spark-sql-kafka-0-10_2.11-2.4.0.jar 242 --user-class-path file:$PWD/spark-avro_2.11-3.2.0.jar 243 --user-class-path file:$PWD/shc-core-1.1.2-2.2-s_2.11-SNAPSHOT.jar 244 --user-class-path file:$PWD/rocksdbjni-5.17.2.jar 245 --user-class-path file:$PWD/kafka-clients-0.10.0.1.jar 246 --user-class-path file:$PWD/elasticsearch-spark-20_2.11-6.4.1.jar 247 --user-class-path file:$PWD/dx_Spark_State_Store_Plugin-1.0-SNAPSHOT.jar 248 --user-class-path file:$PWD/bijection-core_2.11-0.9.5.jar 249 --user-class-path file:$PWD/bijection-avro_2.11-0.9.5.jar 250 1>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stdout 251 2>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stderr" 252 hadoop_shell_errorcode=$? 253 if [ $hadoop_shell_errorcode -ne 0 ] 254 then 255 exit $hadoop_shell_errorcode 256 fi 257 [root@CDH-141 dx]#
YARN-Client運行架構原理
說明如下:
- Spark Yarn Client向YARN的ResourceManager申請啟動Application Master。同時在SparkContent初始化中將創建DAGScheduler和TASKScheduler等,由於我們選擇的是Yarn-Client模式,程序會選擇YarnClientClusterSchedulerYarnScheduler和YarnClientSchedulerBackend;
- ResourceManager收到請求后,在集群中選擇一個NodeManager,為該應用程序分配第一個Container,要求它在這個Container中啟動應用程序的ApplicationMaster,與YARN-Cluster區別的是在該ApplicationMaster不運行SparkContext,只與SparkContext進行聯系進行資源的分派;
- Client中的SparkContext初始化完畢后,與ApplicationMaster建立通訊,向ResourceManager注冊,根據任務信息向ResourceManager申請資源(Container);
- 一旦ApplicationMaster申請到資源(也就是Container)后,便與對應的NodeManager通信,要求它在獲得的Container中啟動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend啟動后會向Client中的SparkContext注冊並申請Task;
- client中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向Driver匯報運行的狀態和進度,以讓Client隨時掌握各個任務的運行狀態,從而可以在任務失敗時重新啟動任務;
- 應用程序運行完成后,Client的SparkContext向ResourceManager申請注銷並關閉自己。
Client模式 vs Cluster模式
- 理解YARN-Client和YARN-Cluster深層次的區別之前先清楚一個概念:Application Master。在YARN中,每個Application實例都有一個ApplicationMaster進程,它是Application啟動的第一個容器。它負責和ResourceManager打交道並請求資源,獲取資源之后告訴NodeManager為其啟動Container。從深層次的含義講YARN-Cluster和YARN-Client模式的區別其實就是ApplicationMaster進程的區別;
- YARN-Cluster模式下,Driver運行在AM(Application Master)中,它負責向YARN申請資源,並監督作業的運行狀況。當用戶提交了作業之后,就可以關掉Client,作業會繼續在YARN上運行,因而YARN-Cluster模式不適合運行交互類型的作業;
- YARN-Client模式下,Application Master僅僅向YARN請求Executor,Client會和請求的Container通信來調度他們工作,也就是說Client不能離開;
提交涉及重要類:
JavaMainApplication
StandaloneAppClient
SparkSubmitArguments
ApplicationMaster
ClientApp
LauncherBackend
YarnClient
YarnClientImpl
ApplicationClientProtocol
ApplicationClientProtocolPBClientImpl
參考文章:
Yarn源碼剖析(三)--- ApplicationMaster的啟動
https://blog.csdn.net/weixin_42642341/article/details/81636135
Yarn源碼剖析(二) --- spark-submit
https://blog.csdn.net/weixin_42642341/article/details/81544101
Spark On YARN啟動流程源碼分析
https://blog.csdn.net/CRISPY_RICE/article/details/71255113
【Spark三十六】Spark On Yarn之yarn-client方式部署
https://bit1129.iteye.com/blog/2182018
白話Spark——DAGScheduler,TaskScheduler,SchedulerBackend模塊實現機制
https://blog.csdn.net/handoking/article/details/81122877