【原創】大叔經驗分享(19)spark on yarn提交任務之后執行進度總是10%


spark 2.1.1

系統中希望監控spark on yarn任務的執行進度,但是監控過程發現提交任務之后執行進度總是10%,直到執行成功或者失敗,進度會突然變為100%,很神奇,

 

 下面看spark on yarn任務提交過程:

 

spark on yarn提交任務時會把mainClass修改為Client

childMainClass = "org.apache.spark.deploy.yarn.Client"

spark-submit過程詳見:https://www.cnblogs.com/barneywill/p/9820684.html

 

下面看Client執行過程:

org.apache.spark.deploy.yarn.Client

  def main(argStrings: Array[String]) {
...
    val sparkConf = new SparkConf
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    sparkConf.remove("spark.jars")
    sparkConf.remove("spark.files")
    val args = new ClientArguments(argStrings)
    new Client(args, sparkConf).run()
...

  def run(): Unit = {
    this.appId = submitApplication()
...

  def submitApplication(): ApplicationId = {
...
      val containerContext = createContainerLaunchContext(newAppResponse)
...

  private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
    : ContainerLaunchContext = {
...
    val amClass =
      if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }

這里調用過程為Client.main->run->submitApplication->createContainerLaunchContext,然后會設置amClass,最終都會調用到ApplicationMaster,因為ExecutorLauncher內部也是調用ApplicationMaster,如下:

org.apache.spark.deploy.yarn.ExecutorLauncher

object ExecutorLauncher {

  def main(args: Array[String]): Unit = {
    ApplicationMaster.main(args)
  }

}

 

下面看ApplicationMaster:

org.apache.spark.deploy.yarn.ApplicationMaster

  def main(args: Array[String]): Unit = {
...
    SparkHadoopUtil.get.runAsSparkUser { () =>
      master = new ApplicationMaster(amArgs, new YarnRMClient)
      System.exit(master.run())
    }
...

  final def run(): Int = {
...
      if (isClusterMode) {
        runDriver(securityMgr)
      } else {
        runExecutorLauncher(securityMgr)
      }
...

  private def registerAM(
      _sparkConf: SparkConf,
      _rpcEnv: RpcEnv,
      driverRef: RpcEndpointRef,
      uiAddress: String,
      securityMgr: SecurityManager) = {
...
    allocator = client.register(driverUrl,
      driverRef,
      yarnConf,
      _sparkConf,
      uiAddress,
      historyAddress,
      securityMgr,
      localResources)

    allocator.allocateResources()
    reporterThread = launchReporterThread()
...
  private def launchReporterThread(): Thread = {
    // The number of failures in a row until Reporter thread give up
    val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)

    val t = new Thread {
      override def run() {
        var failureCount = 0
        while (!finished) {
          try {
            if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
              finish(FinalApplicationStatus.FAILED,
                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
                s"Max number of executor failures ($maxNumExecutorFailures) reached")
            } else {
              logDebug("Sending progress")
              allocator.allocateResources()
            }
...

這里調用過程為ApplicationMaster.main->run,run中會調用runDriver或者runExecutorLauncher,最終都會調用到registerAM,其中會調用YarnAllocator.allocateResources,然后在launchReporterThread中會啟動一個thread,其中也會不斷調用YarnAllocator.allocateResources,下面看YarnAllocator:

org.apache.spark.deploy.yarn.YarnAllocator

  def allocateResources(): Unit = synchronized {
    updateResourceRequests()

    val progressIndicator = 0.1f
    // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
    // requests.
    val allocateResponse = amClient.allocate(progressIndicator)

可見這里會設置進度為0.1,即10%,而且是硬編碼,所以spark on yarn的執行進度一直為10%,所以想監控spark on yarn的任務進度看來是徒勞的;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM