sparkLauncher記錄(spark-2.2.0)


sparkLauncher 代碼記錄

1.概述
    sparkLauncher 是一個在代碼里提交spark任務的類
    這個類底層使用的依然是spark-submit腳本進行提交,通過ProcessBuilder 來設置相關環境參數調用
    主要的方法有下面幾個
    launch                   提交一個任務,任務的提交輸出結果如何由用戶自己處理
    createBuilder          launch和startApplication方法中調用生成ProcessBuilder執行shell腳本對象的方法
    startApplication      提交一個任務,並根據監聽任務狀態的改變來執行用戶指定的listener

2.launch方法

   //這個方法是在設置完sparkLauncher的其它參數方法之后調用這個方法執行任務代碼
   //通過createBuilder 創建一個Process 的對象然后執行

    public Process launch() throws IOException {
        Process childProc = createBuilder().start();     //在這里調用
        if (redirectToLog) {
          String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
          new OutputRedirector(childProc.getInputStream(), loggerName, REDIRECTOR_FACTORY);
        }
        return childProc;
    }

3.createBuilder方法
  //這個方法是構造執行命令,生成ProcessBuilder的函數
  //通過對象設置的javahome,sparkhome 找到對應的spark-submit腳本然后構建參數,生成cmd 命令,最后放到processBuilder中生成執行子進程
  
  private ProcessBuilder createBuilder() {
    List<String> cmd = new ArrayList<>();
    String script = isWindows() ? "spark-submit.cmd" : "spark-submit";   //找到對應的腳本
    cmd.add(join(File.separator, builder.getSparkHome(), "bin", script));   //設置腳本絕對路徑
    cmd.addAll(builder.buildSparkSubmitArgs());  //腳本的設置參數

    //下面的就是做一些目錄檢查,系統檢查,標准輸出錯誤輸出等設置之類的。

    // Since the child process is a batch script, let's quote things so that special characters are
    // preserved, otherwise the batch interpreter will mess up the arguments. Batch scripts are
    // weird.
    if (isWindows()) {
      List<String> winCmd = new ArrayList<>();
      for (String arg : cmd) {
        winCmd.add(quoteForBatchScript(arg));
      }
      cmd = winCmd;
    }
    
    //這里創建的ProcessBuilder對象,並設置環境變量(sparkConf通過環境變量獲取spark相關配置,所以把參數設置到環境變量中可以讓sparkConf獲取到)
    ProcessBuilder pb = new ProcessBuilder(cmd.toArray(new String[cmd.size()]));
    for (Map.Entry<String, String> e : builder.childEnv.entrySet()) {
      pb.environment().put(e.getKey(), e.getValue());
    }

    if (workingDir != null) {
      pb.directory(workingDir);
    }

    // Only one of redirectError and redirectError(...) can be specified.
    // Similarly, if redirectToLog is specified, no other redirections should be specified.
    checkState(!redirectErrorStream || errorStream == null,
      "Cannot specify both redirectError() and redirectError(...) ");
    checkState(!redirectToLog ||
      (!redirectErrorStream && errorStream == null && outputStream == null),
      "Cannot used redirectToLog() in conjunction with other redirection methods.");

    if (redirectErrorStream || redirectToLog) {
      pb.redirectErrorStream(true);
    }
    if (errorStream != null) {
      pb.redirectError(errorStream);
    }
    if (outputStream != null) {
      pb.redirectOutput(outputStream);
    }

    return pb;
  }

3.startApplication方法
  //這個方法也是用來執行任務的。只是比launch 多了一個狀態監控的功能,用戶可以通過提供sparkAppHandle.Listener來實現任務狀態變化時做一些動作

  public SparkAppHandle startApplication(SparkAppHandle.Listener... listeners) throws IOException {
    ChildProcAppHandle handle = LauncherServer.newAppHandle();//這里啟動一個LauncherServer的服務來接收app狀態變化的數據,更詳細的將會在其它部分記錄
    for (SparkAppHandle.Listener l : listeners) {
      handle.addListener(l); //這里把用戶提供的監聽放到執行函數里執行,具體如何實現將在其它部分記錄
    }

    //這里就是獲取spark-submit標准輸出數據的log文件名稱,如果沒有設置,就通過下面代碼進行生成和獲取,根據類名和包名自動生成
    String loggerName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
    ProcessBuilder pb = createBuilder();
    // Only setup stderr + stdout to logger redirection if user has not otherwise configured output
    // redirection.
    if (loggerName == null) {
      String appName = builder.getEffectiveConfig().get(CHILD_PROCESS_LOGGER_NAME);
      if (appName == null) {
        if (builder.appName != null) {
          appName = builder.appName;
        } else if (builder.mainClass != null) {
          int dot = builder.mainClass.lastIndexOf(".");
          if (dot >= 0 && dot < builder.mainClass.length() - 1) {
            appName = builder.mainClass.substring(dot + 1, builder.mainClass.length());
          } else {
            appName = builder.mainClass;
          }
        } else if (builder.appResource != null) {
          appName = new File(builder.appResource).getName();
        } else {
          appName = String.valueOf(COUNTER.incrementAndGet());
        }
      }
      String loggerPrefix = getClass().getPackage().getName();  //獲取包名
      loggerName = String.format("%s.app.%s", loggerPrefix, appName);  //根據包名和appname生成log文件名
      pb.redirectErrorStream(true);
    }

    //這里把LauncherServer的端口號通告到環境變量里,LauncherBackend就是通過環境變量獲取LauncherServer的端口號進行通信的。
   //LauncherBackend 的具體詳情將會在其他部分進行記錄
    pb.environment().put(LauncherProtocol.ENV_LAUNCHER_PORT,
      String.valueOf(LauncherServer.getServerInstance().getPort()));
    pb.environment().put(LauncherProtocol.ENV_LAUNCHER_SECRET, handle.getSecret());
    try {
      handle.setChildProc(pb.start(), loggerName);  //啟動spark-submit 提交任務,並把標准log輸出到之前設置的loggername里。
    } catch (IOException ioe) {
      handle.kill();
      throw ioe;
    }

    return handle;
  }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM