org.apache.spark.launcher.Main源碼分析


 public static void main(String[] argsArray) throws Exception {
    //org.apache.spark.launcher.Main
    checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
  /**
  * java -cp spark_home/lib/spark-assembly-1.6.0-hadoop2.6.0.jar org.apache.spark.launcher.Main  org.apache.spark.deploy.SparkSubmit
  *      --class org.apache.spark.repl.Main --name "Spark shell" --master spark://ip:7077
     這個main方法最終會將org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name "Spark shell" --master spark://luyl152:7077
     給spark-class的 exec "${CMD[@]}"執行
  */


    List<String> args = new ArrayList<>(Arrays.asList(argsArray));
    //spark-shell執行時第1個參數就是org.apache.spark.deploy.SparkSubmit
    String className = args.remove(0);//從args中移除org.apache.spark.deploy.SparkSubmit參數
    //可以在spark-class或別的配制文件中 export SPARK_PRINT_LAUNCH_COMMAND=任何值,只要不為空即可
    //可以用它來打印cmd,也就是spark-class的 exec "${CMD[@]}"中的值
    boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
    AbstractCommandBuilder builder;//創建命令解析器
    if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
      try {
        //將參數解析到spark對應的變量中,如 --class的值 放到mainClass變量中。
        //如果有多出來的參數則將該參數放到SparkSubmitCommandBuilder成員sparkArgs這個集合中
        builder = new SparkSubmitCommandBuilder(args);
      } catch (IllegalArgumentException e) {
        printLaunchCommand = false;
        System.err.println("Error: " + e.getMessage());
        System.err.println();

        MainClassOptionParser parser = new MainClassOptionParser();
        try {
          parser.parse(args);
        } catch (Exception ignored) {
          // Ignore parsing exceptions.
        }

        List<String> help = new ArrayList<>();
        if (parser.className != null) {
          help.add(parser.CLASS);
          help.add(parser.className);
        }
        help.add(parser.USAGE_ERROR);
        builder = new SparkSubmitCommandBuilder(help);
      }
    } else {
//第一個參數如果不是:org.apache.spark.deploy.SparkSubmit,則使用SparkClassCommandBuilder,解析器 builder
= new SparkClassCommandBuilder(className, args); } Map<String, String> env = new HashMap<>(); List<String> cmd = builder.buildCommand(env);//這個在SparkSubmitCommandBuilder中重新實現了,不是抽象類中的函數 if (printLaunchCommand) { System.err.println("Spark Command: " + join(" ", cmd)); System.err.println("========================================"); } if (isWindows()) { System.out.println(prepareWindowsCommand(cmd, env)); } else { // In bash, use NULL as the arg separator since it cannot be used in an argument.

    //返回有效的參數,會通過打印的方式給spark-class的 exec "${CMD[@]}"執行
    /**  '\0'和空格不是同一個概念。        

        '\0'表示字符串結束符,代表字符串結束,而空格是一個普通字符,顯示在文本中可以選中。        

        '\0'的ASCII碼為0,空格的ASCII碼為32,兩個不是同一個字符         

        在計算機程序中通常使用'\0'表示字符串結束,空格為文本字符,二者完全不同
    */

      List<String> bashCmd = prepareBashCommand(cmd, env);
      for (String c : bashCmd) {
        System.out.print(c);//打印返回的命令字符,即被spark-class的CMD接收
        System.out.print('\0');
      }
    }
  }
 // org.apache.spark.launcher.SparkSubmitCommandBuilder
  SparkSubmitCommandBuilder(List<String> args) {
    // args參數是這些:--class org.apache.spark.repl.Main --name "Spark shell" --master spark://ip:7077
    //或者:pyspark-shell-main --name PySparkShell
    //或者: --master yarn --deploy-mode cluster --name Hbase --verbose --conf xxxx
    //即PYSPARK_SHELL,SPARKR_SHELL,RUN_EXAMPLE這種運行的第一個參數就是這些類型的名稱
    this.allowsMixedArguments = false;
    this.sparkArgs = new ArrayList<>();
    boolean isExample = false;
    List<String> submitArgs = args;

    if (args.size() > 0) {
      switch (args.get(0)) {
        //第一個參數值是pyspark-shell-main,如果python執行的
        case PYSPARK_SHELL:
          this.allowsMixedArguments = true;
          appResource = PYSPARK_SHELL;
          submitArgs = args.subList(1, args.size());
          break;

        case SPARKR_SHELL: //"sparkr-shell-main"
          this.allowsMixedArguments = true;
          appResource = SPARKR_SHELL;
          submitArgs = args.subList(1, args.size());
          break;

        case RUN_EXAMPLE:
          isExample = true;
          submitArgs = args.subList(1, args.size());
      }

      this.isExample = isExample;
      //作用就是將spark-submit放進來的參數對應值賦到spark對應的變量中,如 --class的值 放到mainClass變量中
      //submitArgs會去除PYSPARK_SHELL,SPARKR_SHELL,RUN_EXAMPLE名稱參數
      // OptionParser屬於org.apache.spark.launcher.SparkSubmitCommandBuilder內部類
      //private class OptionParser extends SparkSubmitOptionParser
      //重新實現了handle系列的函數
      OptionParser parser = new OptionParser(); 
      parser.parse(submitArgs); //它的父類方法SparkSubmitOptionParser實現的
      this.isAppResourceReq = parser.isAppResourceReq;
    }  else {
      this.isExample = isExample;
      this.isAppResourceReq = false;
    }
  }
 /**
   * Parse a list of spark-submit command line options.
   * <p>
   * See SparkSubmitArguments.scala for a more formal description of available options.
   *
   * @throws IllegalArgumentException If an error is found during parsing.
   *  參數是這些:--class org.apache.spark.repl.Main --name"Spark shell" --master spark://ip:7077. 
   *  作用就是將spark-submit放進來的參數對應值賦到spark對應的變量中,如 --class的值 放到mainClass變量中
   *  org.apache.spark.launcher.SparkSubmitOptionParser(parse)
  */
  
  protected final void parse(List<String> args) {
    //spark-submit可以傳sparkConf參數:--confPROP=VALUE ,參數可以看org.apache.spark.deploy.SparkSubmitArguments類最后面
    //或spark-submit-h就可以查看
    Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)");

    int idx = 0;
    for (idx = 0; idx < args.size(); idx++) {
      String arg = args.get(idx);
      String value = null;

      Matcher m = eqSeparatedOpt.matcher(arg);
      if (m.matches()) {
        arg = m.group(1);//--conf PROP
        value = m.group(2);//VALUE
      }

      // Look for options with a value.
      //該方法主要是找到spark-submit后面的帶有--參數,如args 放進"--class",和opts二維數組進行匹配
      //匹配到的還是返回--class,如果沒有匹配到則null
      String name = findCliOption(arg, opts);
      if (name != null) {
        if (value == null) {
          if (idx == args.size() - 1) {//如果匹配了並且沒有參數值則報錯,如:只有 --class ,則size是1,idx此時0, 1-1=0
            throw new IllegalArgumentException(
                String.format("Missing argument for option '%s'.", arg));
          }
          idx++;
          value = args.get(idx); //如果有值,則idx索引的下一位就是參數對應的值
        }
        //name就是spark-submit的參數如--class,而value就是參數對應的值
        //OptionParser屬於org.apache.spark.launcher.SparkSubmitCommandBuilder內部類
        //private class OptionParser extends SparkSubmitOptionParser     
        // 在它的自身OptionParser做的實現,作用就是將spark-submit放進來的參數對應值賦到spark對應的變量中      //如 --class的值放到mainClass變量中(里面實現很easy,就不寫了)
        if (!handle(name, value)) { //調用的是OptionParser類重新實現的handle函數
          break;
        }
        continue;
      }

      // Look for a switch.
      // 如果上面沒有匹配到,會再去匹配一下是否有出現-verbose這樣參數
      name = findCliOption(arg, switches);
      if (name != null) {
        if (!handle(name, null)) {//調用的是OptionParser類重新實現的handle函數
          break;
        }
        continue;
      }

      if (!handleUnknown(arg)) {//調用的是OptionParser類重新實現的handleUnknown函數
        break;
      }
    }

    if (idx < args.size()) {
      idx++;
    }
    //將多出來的參數加到 SparkSubmitCommandBuilder() {his.sparkArgs = new ArrayList<String>();..}
    handleExtraArgs(args.subList(idx, args.size()));//調用的是OptionParser類重新實現的handleExtraArgs函數
  }
//其中一些key的定義
 protected final String CLASS = "--class";
  protected final String CONF = "--conf";
  protected final String DEPLOY_MODE = "--deploy-mode";
  protected final String DRIVER_CLASS_PATH = "--driver-class-path";
  protected final String DRIVER_CORES = "--driver-cores";
  protected final String DRIVER_JAVA_OPTIONS =  "--driver-java-options";
  protected final String DRIVER_LIBRARY_PATH = "--driver-library-path";
  protected final String DRIVER_MEMORY = "--driver-memory";
  protected final String EXECUTOR_MEMORY = "--executor-memory";
  protected final String FILES = "--files";
  protected final String JARS = "--jars";
  protected final String KILL_SUBMISSION = "--kill";
  protected final String MASTER = "--master";
  protected final String NAME = "--name";
  protected final String PACKAGES = "--packages";
  protected final String PACKAGES_EXCLUDE = "--exclude-packages";
  protected final String PROPERTIES_FILE = "--properties-file";
  protected final String PROXY_USER = "--proxy-user";
  protected final String PY_FILES = "--py-files";
  protected final String REPOSITORIES = "--repositories";
  protected final String STATUS = "--status";
  protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores";

  // Options that do not take arguments.
  protected final String HELP = "--help";
  protected final String SUPERVISE = "--supervise";
  protected final String USAGE_ERROR = "--usage-error";
  protected final String VERBOSE = "--verbose";
  protected final String VERSION = "--version";

  // Standalone-only options.

  // YARN-only options.
  protected final String ARCHIVES = "--archives";
  protected final String EXECUTOR_CORES = "--executor-cores";
  protected final String KEYTAB = "--keytab";
  protected final String NUM_EXECUTORS = "--num-executors";
  protected final String PRINCIPAL = "--principal";
  protected final String QUEUE = "--queue";

  /**
   * This is the canonical list of spark-submit options. Each entry in the array contains the
   * different aliases for the same option; the first element of each entry is the "official"
   * name of the option, passed to {@link #handle(String, String)}.
   * <p>
   * Options not listed here nor in the "switch" list below will result in a call to
   * {@link #handleUnknown(String)}.
   * <p>
   * These two arrays are visible for tests.
   */
  final String[][] opts = {
    { ARCHIVES },
    { CLASS },
    { CONF, "-c" },
    { DEPLOY_MODE },
    { DRIVER_CLASS_PATH },
    { DRIVER_CORES },
    { DRIVER_JAVA_OPTIONS },
    { DRIVER_LIBRARY_PATH },
    { DRIVER_MEMORY },
    { EXECUTOR_CORES },
    { EXECUTOR_MEMORY },
    { FILES },
    { JARS },
    { KEYTAB },
    { KILL_SUBMISSION },
    { MASTER },
    { NAME },
    { NUM_EXECUTORS },
    { PACKAGES },
    { PACKAGES_EXCLUDE },
    { PRINCIPAL },
    { PROPERTIES_FILE },
    { PROXY_USER },
    { PY_FILES },
    { QUEUE },
    { REPOSITORIES },
    { STATUS },
    { TOTAL_EXECUTOR_CORES },
  };

  /**
   * List of switches (command line options that do not take parameters) recognized by spark-submit.
   */
  final String[][] switches = {
    { HELP, "-h" },
    { SUPERVISE },
    { USAGE_ERROR },
    { VERBOSE, "-v" },
    { VERSION },
  };
private class OptionParser extends SparkSubmitOptionParser {

    boolean isAppResourceReq = true;
    /**
     *作用就是將spark-submit放進來的參數對應值賦到spark對應的變量中
   */
    @Override
    protected boolean handle(String opt, String value) {
      switch (opt) {
        case MASTER:
          master = value;
          break;
        case DEPLOY_MODE:
          deployMode = value;
          break;
        case PROPERTIES_FILE:
          propertiesFile = value;
          break;
        case DRIVER_MEMORY:
          conf.put(SparkLauncher.DRIVER_MEMORY, value);
          break;
        case DRIVER_JAVA_OPTIONS:
          conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value);
          break;
        case DRIVER_LIBRARY_PATH:
          conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value);
          break;
        case DRIVER_CLASS_PATH:
          conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value);
          break;
        case CONF:
          String[] setConf = value.split("=", 2);
          checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value);
          conf.put(setConf[0], setConf[1]);
          break;
        case CLASS:
          // The special classes require some special command line handling, since they allow
          // mixing spark-submit arguments with arguments that should be propagated to the shell
          // itself. Note that for this to work, the "--class" argument must come before any
          // non-spark-submit arguments.
          mainClass = value;
          if (specialClasses.containsKey(value)) {
            allowsMixedArguments = true;
            appResource = specialClasses.get(value);
          }
          break;
................................................
 @Override
  public List<String> buildCommand(Map<String, String> env) //此處看一下SparkSubmitOptionParser.buildCommand(Map)這個方法
      throws IOException, IllegalArgumentException {
        //PYSPARK_SHELL_RESOURCE表示python,SPARKR_SHELL_RESOURCE表示r語言
    if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) {
      return buildPySparkShellCommand(env);
    } else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) {
      return buildSparkRCommand(env);
    } else {
      //這個env就是一個空的Map,會調用buildSparkSubmitCommand()方法
      return buildSparkSubmitCommand(env);
    }
  }
//org.apache.spark.launcher.SparkSubmitCommandBuilder$buildSparkSubmitCommand)
  private List<String> buildSparkSubmitCommand(Map<String, String> env)
      throws IOException, IllegalArgumentException {
    // Load the properties file and check whether spark-submit will be running the app's driver
    // or just launching a cluster app. When running the driver, the JVM's argument will be
    // modified to cover the driver's configuration.
    //加載屬性文件,並檢查spark-submit是否正在運行driver的應用程序或僅啟動集群應用程序。
    // 在運行驅動程序時,JVM的參數將被修改以涵蓋驅動程序的配置。
    Map<String, String> config = getEffectiveConfig();
    boolean isClientMode = isClientMode(config);
    //默認如果standalone不匹配--deploy-mode cluster就是client,所以這個值是true 
    // 這個DRIVER_EXTRA_CLASSPATH在client模式是不能直接在SparkConf中設置的,因為driver的JVM已經被Spark-submit通過反射啟動起來了 
    // 而是通過參數:--driver-class-path來設置的 
    // 這個DRIVER_EXTRA_CLASSPATH在client模式是不能直接在SparkConf中設置的,
    // 因為driver的JVM已經被Spark-submit通過反射啟動起來了,應該通過參數:--driver-class-path來設置的
    String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null;

    List<String> cmd = buildJavaCommand(extraClassPath);
    // Take Thrift Server as daemon
    if (isThriftServer(mainClass)) {
      addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS"));
    }
    //SPARK_SUBMIT_OPTS就是在spark-shell中提到的,需要將java的classpath手動設置到scala中
    // SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS-Dscala.usejavacp=true"
    addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS"));

    // We don't want the client to specify Xmx. These have to be set by their corresponding
    // memory flag --driver-memory or configuration entry spark.driver.memory
    String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS);
    if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) {
      String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " +
                   "java options (was %s). Use the corresponding --driver-memory or " +
                   "spark.driver.memory configuration instead.", driverExtraJavaOptions);
      throw new IllegalArgumentException(msg);
    }

    if (isClientMode) {
      // Figuring out where the memory value come from is a little tricky due to precedence.
      // Precedence is observed in the following order:
      // - explicit configuration (setConf()), which also covers --driver-memory cli argument.
      // - properties file.
      // - SPARK_DRIVER_MEMORY env variable
      // - SPARK_MEM env variable
      // - default value (1g)
      // Take Thrift Server as daemon
      String tsMemory =
        isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null;
      String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY),
        System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM);
      cmd.add("-Xmx" + memory);//最大、小堆內存默認是1g
      addOptionString(cmd, driverExtraJavaOptions);
      mergeEnvPathList(env, getLibPathEnvName(),
        config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH));
    }

    cmd.add("org.apache.spark.deploy.SparkSubmit");
    //buildSparkSubmitArgs()返回list將上面spark-submit參數注入進來的參數及對應值取出來
    cmd.addAll(buildSparkSubmitArgs());
    return cmd;
  }
//org.apache.spark.launcher.SparkSubmitCommandBuilder$buildSparkSubmitArgs
  List<String> buildSparkSubmitArgs() {
    List<String> args = new ArrayList<>();
    SparkSubmitOptionParser parser = new SparkSubmitOptionParser();

    if (!allowsMixedArguments && isAppResourceReq) {
      checkArgument(appResource != null, "Missing application resource.");
    }

    if (verbose) {
      args.add(parser.VERBOSE);
    }

    if (master != null) {
      args.add(parser.MASTER);
      args.add(master);
    }

    if (deployMode != null) {
      args.add(parser.DEPLOY_MODE);
      args.add(deployMode);
    }

    if (appName != null) {
      args.add(parser.NAME);
      args.add(appName);
    }

    for (Map.Entry<String, String> e : conf.entrySet()) {
      args.add(parser.CONF);
      args.add(String.format("%s=%s", e.getKey(), e.getValue()));
    }

    if (propertiesFile != null) {
      args.add(parser.PROPERTIES_FILE);
      args.add(propertiesFile);
    }

    if (isExample) {
      jars.addAll(findExamplesJars());
    }

    if (!jars.isEmpty()) {
      args.add(parser.JARS);
      args.add(join(",", jars));
    }

    if (!files.isEmpty()) {
      args.add(parser.FILES);
      args.add(join(",", files));
    }
...................................................
//CMD的命令如下
 /usr/java/jdk1.7.0_79/bin/java -cp /opt/cloudera/parcels/SPARK2-2.0.0.cloudera.beta1-1.cdh5.7.0.p0.108015/lib/spark2/conf/:/opt/cloudera/parcels/SPARK2-2.0.0.cloudera.beta1-1.cdh5.7.0.p0.108015/lib/spark2/jars/*:/opt/cloudera/parcels/SPARK2-2.0.0.cloudera.beta1-1.cdh5.7.0.p0.108015/lib/spark2/conf/yarn-conf/ -Dscala.usejavacp=true -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name Spark shell spark-shell
//或者
/usr/java/jdk1.7.0_79/bin/java -cp /opt/cloudera/parcels/SPARK2-2.0.0.cloudera.beta1-1.cdh5.7.0.p0.108015/lib/spark2/conf/:/opt/cloudera/parcels/SPARK2-2.0.0.cloudera.beta1-1.cdh5.7.0.p0.108015/lib/spark2/jars/*:/etc/hadoop/:/etc/hadoop/conf.cloudera.yarn/ -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode cluster --conf spark.driver.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/lib/* --conf spark.scheduler.mode=FAIR --conf spark.executorEnv.JAVA_HOME=/usr/java/jdk1.8 --conf spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8 --conf spark.yarn.maxAppAttempts=1 --class opHbase.opHbase.TopHbase --name Hbase --verbose --files /etc/hadoop/conf/log4j.properties,/etc/hive/conf/hive-site.xml --jars hdfs://10.8.18.74:8020/ada/spark/share/tech_component/tc.plat.spark.jar,hdfs://10.8.18.74:8020/ada/spark/share/tech_component/bigdata4i-1.0.jar,hdfs://10.8.18.74:8020/ada/spark/share/tech_component/bigdata-sparklog-1.0.jar,hdfs://108474.server.bigdata.com.cn:8020/user/lyy/App/tc.app.test.opHbase-1.0.jar,hdfs://10.8.18.74:8020/ada/spark/share/tech_component/mysql-connector-java-5.1.24-bin.jar hdfs://108474.server.bigdata.com.cn:8020/user/lyy/App/opHbase.opHbase.jar loglevel=ALL path=hdfs://108474.server.bigdata.com.cn:8020/user/lyy/data/hfile hbtab=hbase_tes

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM