public static void main(String[] argsArray) throws Exception { //org.apache.spark.launcher.Main checkArgument(argsArray.length > 0, "Not enough arguments: missing class name."); /** * java -cp spark_home/lib/spark-assembly-1.6.0-hadoop2.6.0.jar org.apache.spark.launcher.Main org.apache.spark.deploy.SparkSubmit * --class org.apache.spark.repl.Main --name "Spark shell" --master spark://ip:7077 這個main方法最終會將org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name "Spark shell" --master spark://luyl152:7077 給spark-class的 exec "${CMD[@]}"執行 */ List<String> args = new ArrayList<>(Arrays.asList(argsArray)); //spark-shell執行時第1個參數就是org.apache.spark.deploy.SparkSubmit String className = args.remove(0);//從args中移除org.apache.spark.deploy.SparkSubmit參數 //可以在spark-class或別的配制文件中 export SPARK_PRINT_LAUNCH_COMMAND=任何值,只要不為空即可 //可以用它來打印cmd,也就是spark-class的 exec "${CMD[@]}"中的值 boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND")); AbstractCommandBuilder builder;//創建命令解析器 if (className.equals("org.apache.spark.deploy.SparkSubmit")) { try { //將參數解析到spark對應的變量中,如 --class的值 放到mainClass變量中。 //如果有多出來的參數則將該參數放到SparkSubmitCommandBuilder成員sparkArgs這個集合中 builder = new SparkSubmitCommandBuilder(args); } catch (IllegalArgumentException e) { printLaunchCommand = false; System.err.println("Error: " + e.getMessage()); System.err.println(); MainClassOptionParser parser = new MainClassOptionParser(); try { parser.parse(args); } catch (Exception ignored) { // Ignore parsing exceptions. } List<String> help = new ArrayList<>(); if (parser.className != null) { help.add(parser.CLASS); help.add(parser.className); } help.add(parser.USAGE_ERROR); builder = new SparkSubmitCommandBuilder(help); } } else {
//第一個參數如果不是:org.apache.spark.deploy.SparkSubmit,則使用SparkClassCommandBuilder,解析器 builder = new SparkClassCommandBuilder(className, args); } Map<String, String> env = new HashMap<>(); List<String> cmd = builder.buildCommand(env);//這個在SparkSubmitCommandBuilder中重新實現了,不是抽象類中的函數 if (printLaunchCommand) { System.err.println("Spark Command: " + join(" ", cmd)); System.err.println("========================================"); } if (isWindows()) { System.out.println(prepareWindowsCommand(cmd, env)); } else { // In bash, use NULL as the arg separator since it cannot be used in an argument.
//返回有效的參數,會通過打印的方式給spark-class的 exec "${CMD[@]}"執行
/** '\0'和空格不是同一個概念。
'\0'表示字符串結束符,代表字符串結束,而空格是一個普通字符,顯示在文本中可以選中。
'\0'的ASCII碼為0,空格的ASCII碼為32,兩個不是同一個字符
在計算機程序中通常使用'\0'表示字符串結束,空格為文本字符,二者完全不同
*/
List<String> bashCmd = prepareBashCommand(cmd, env); for (String c : bashCmd) { System.out.print(c);//打印返回的命令字符,即被spark-class的CMD接收 System.out.print('\0'); } } }
// org.apache.spark.launcher.SparkSubmitCommandBuilder SparkSubmitCommandBuilder(List<String> args) { // args參數是這些:--class org.apache.spark.repl.Main --name "Spark shell" --master spark://ip:7077 //或者:pyspark-shell-main --name PySparkShell //或者: --master yarn --deploy-mode cluster --name Hbase --verbose --conf xxxx //即PYSPARK_SHELL,SPARKR_SHELL,RUN_EXAMPLE這種運行的第一個參數就是這些類型的名稱 this.allowsMixedArguments = false; this.sparkArgs = new ArrayList<>(); boolean isExample = false; List<String> submitArgs = args; if (args.size() > 0) { switch (args.get(0)) { //第一個參數值是pyspark-shell-main,如果python執行的 case PYSPARK_SHELL: this.allowsMixedArguments = true; appResource = PYSPARK_SHELL; submitArgs = args.subList(1, args.size()); break; case SPARKR_SHELL: //"sparkr-shell-main" this.allowsMixedArguments = true; appResource = SPARKR_SHELL; submitArgs = args.subList(1, args.size()); break; case RUN_EXAMPLE: isExample = true; submitArgs = args.subList(1, args.size()); } this.isExample = isExample; //作用就是將spark-submit放進來的參數對應值賦到spark對應的變量中,如 --class的值 放到mainClass變量中 //submitArgs會去除PYSPARK_SHELL,SPARKR_SHELL,RUN_EXAMPLE名稱參數 // OptionParser屬於org.apache.spark.launcher.SparkSubmitCommandBuilder內部類 //private class OptionParser extends SparkSubmitOptionParser //重新實現了handle系列的函數 OptionParser parser = new OptionParser(); parser.parse(submitArgs); //它的父類方法SparkSubmitOptionParser實現的 this.isAppResourceReq = parser.isAppResourceReq; } else { this.isExample = isExample; this.isAppResourceReq = false; } }
/** * Parse a list of spark-submit command line options. * <p> * See SparkSubmitArguments.scala for a more formal description of available options. * * @throws IllegalArgumentException If an error is found during parsing. * 參數是這些:--class org.apache.spark.repl.Main --name"Spark shell" --master spark://ip:7077. * 作用就是將spark-submit放進來的參數對應值賦到spark對應的變量中,如 --class的值 放到mainClass變量中 * org.apache.spark.launcher.SparkSubmitOptionParser(parse) */ protected final void parse(List<String> args) { //spark-submit可以傳sparkConf參數:--confPROP=VALUE ,參數可以看org.apache.spark.deploy.SparkSubmitArguments類最后面 //或spark-submit-h就可以查看 Pattern eqSeparatedOpt = Pattern.compile("(--[^=]+)=(.+)"); int idx = 0; for (idx = 0; idx < args.size(); idx++) { String arg = args.get(idx); String value = null; Matcher m = eqSeparatedOpt.matcher(arg); if (m.matches()) { arg = m.group(1);//--conf PROP value = m.group(2);//VALUE } // Look for options with a value. //該方法主要是找到spark-submit后面的帶有--參數,如args 放進"--class",和opts二維數組進行匹配 //匹配到的還是返回--class,如果沒有匹配到則null String name = findCliOption(arg, opts); if (name != null) { if (value == null) { if (idx == args.size() - 1) {//如果匹配了並且沒有參數值則報錯,如:只有 --class ,則size是1,idx此時0, 1-1=0 throw new IllegalArgumentException( String.format("Missing argument for option '%s'.", arg)); } idx++; value = args.get(idx); //如果有值,則idx索引的下一位就是參數對應的值 } //name就是spark-submit的參數如--class,而value就是參數對應的值 //OptionParser屬於org.apache.spark.launcher.SparkSubmitCommandBuilder內部類 //private class OptionParser extends SparkSubmitOptionParser // 在它的自身OptionParser做的實現,作用就是將spark-submit放進來的參數對應值賦到spark對應的變量中 //如 --class的值放到mainClass變量中(里面實現很easy,就不寫了) if (!handle(name, value)) { //調用的是OptionParser類重新實現的handle函數 break; } continue; } // Look for a switch. // 如果上面沒有匹配到,會再去匹配一下是否有出現-verbose這樣參數 name = findCliOption(arg, switches); if (name != null) { if (!handle(name, null)) {//調用的是OptionParser類重新實現的handle函數 break; } continue; } if (!handleUnknown(arg)) {//調用的是OptionParser類重新實現的handleUnknown函數 break; } } if (idx < args.size()) { idx++; } //將多出來的參數加到 SparkSubmitCommandBuilder() {his.sparkArgs = new ArrayList<String>();..} handleExtraArgs(args.subList(idx, args.size()));//調用的是OptionParser類重新實現的handleExtraArgs函數 }
//其中一些key的定義 protected final String CLASS = "--class"; protected final String CONF = "--conf"; protected final String DEPLOY_MODE = "--deploy-mode"; protected final String DRIVER_CLASS_PATH = "--driver-class-path"; protected final String DRIVER_CORES = "--driver-cores"; protected final String DRIVER_JAVA_OPTIONS = "--driver-java-options"; protected final String DRIVER_LIBRARY_PATH = "--driver-library-path"; protected final String DRIVER_MEMORY = "--driver-memory"; protected final String EXECUTOR_MEMORY = "--executor-memory"; protected final String FILES = "--files"; protected final String JARS = "--jars"; protected final String KILL_SUBMISSION = "--kill"; protected final String MASTER = "--master"; protected final String NAME = "--name"; protected final String PACKAGES = "--packages"; protected final String PACKAGES_EXCLUDE = "--exclude-packages"; protected final String PROPERTIES_FILE = "--properties-file"; protected final String PROXY_USER = "--proxy-user"; protected final String PY_FILES = "--py-files"; protected final String REPOSITORIES = "--repositories"; protected final String STATUS = "--status"; protected final String TOTAL_EXECUTOR_CORES = "--total-executor-cores"; // Options that do not take arguments. protected final String HELP = "--help"; protected final String SUPERVISE = "--supervise"; protected final String USAGE_ERROR = "--usage-error"; protected final String VERBOSE = "--verbose"; protected final String VERSION = "--version"; // Standalone-only options. // YARN-only options. protected final String ARCHIVES = "--archives"; protected final String EXECUTOR_CORES = "--executor-cores"; protected final String KEYTAB = "--keytab"; protected final String NUM_EXECUTORS = "--num-executors"; protected final String PRINCIPAL = "--principal"; protected final String QUEUE = "--queue"; /** * This is the canonical list of spark-submit options. Each entry in the array contains the * different aliases for the same option; the first element of each entry is the "official" * name of the option, passed to {@link #handle(String, String)}. * <p> * Options not listed here nor in the "switch" list below will result in a call to * {@link #handleUnknown(String)}. * <p> * These two arrays are visible for tests. */ final String[][] opts = { { ARCHIVES }, { CLASS }, { CONF, "-c" }, { DEPLOY_MODE }, { DRIVER_CLASS_PATH }, { DRIVER_CORES }, { DRIVER_JAVA_OPTIONS }, { DRIVER_LIBRARY_PATH }, { DRIVER_MEMORY }, { EXECUTOR_CORES }, { EXECUTOR_MEMORY }, { FILES }, { JARS }, { KEYTAB }, { KILL_SUBMISSION }, { MASTER }, { NAME }, { NUM_EXECUTORS }, { PACKAGES }, { PACKAGES_EXCLUDE }, { PRINCIPAL }, { PROPERTIES_FILE }, { PROXY_USER }, { PY_FILES }, { QUEUE }, { REPOSITORIES }, { STATUS }, { TOTAL_EXECUTOR_CORES }, }; /** * List of switches (command line options that do not take parameters) recognized by spark-submit. */ final String[][] switches = { { HELP, "-h" }, { SUPERVISE }, { USAGE_ERROR }, { VERBOSE, "-v" }, { VERSION }, };
private class OptionParser extends SparkSubmitOptionParser { boolean isAppResourceReq = true; /** *作用就是將spark-submit放進來的參數對應值賦到spark對應的變量中 */ @Override protected boolean handle(String opt, String value) { switch (opt) { case MASTER: master = value; break; case DEPLOY_MODE: deployMode = value; break; case PROPERTIES_FILE: propertiesFile = value; break; case DRIVER_MEMORY: conf.put(SparkLauncher.DRIVER_MEMORY, value); break; case DRIVER_JAVA_OPTIONS: conf.put(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS, value); break; case DRIVER_LIBRARY_PATH: conf.put(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH, value); break; case DRIVER_CLASS_PATH: conf.put(SparkLauncher.DRIVER_EXTRA_CLASSPATH, value); break; case CONF: String[] setConf = value.split("=", 2); checkArgument(setConf.length == 2, "Invalid argument to %s: %s", CONF, value); conf.put(setConf[0], setConf[1]); break; case CLASS: // The special classes require some special command line handling, since they allow // mixing spark-submit arguments with arguments that should be propagated to the shell // itself. Note that for this to work, the "--class" argument must come before any // non-spark-submit arguments. mainClass = value; if (specialClasses.containsKey(value)) { allowsMixedArguments = true; appResource = specialClasses.get(value); } break; ................................................
@Override public List<String> buildCommand(Map<String, String> env) //此處看一下SparkSubmitOptionParser.buildCommand(Map)這個方法 throws IOException, IllegalArgumentException { //PYSPARK_SHELL_RESOURCE表示python,SPARKR_SHELL_RESOURCE表示r語言 if (PYSPARK_SHELL.equals(appResource) && isAppResourceReq) { return buildPySparkShellCommand(env); } else if (SPARKR_SHELL.equals(appResource) && isAppResourceReq) { return buildSparkRCommand(env); } else { //這個env就是一個空的Map,會調用buildSparkSubmitCommand()方法 return buildSparkSubmitCommand(env); } }
//org.apache.spark.launcher.SparkSubmitCommandBuilder$buildSparkSubmitCommand) private List<String> buildSparkSubmitCommand(Map<String, String> env) throws IOException, IllegalArgumentException { // Load the properties file and check whether spark-submit will be running the app's driver // or just launching a cluster app. When running the driver, the JVM's argument will be // modified to cover the driver's configuration. //加載屬性文件,並檢查spark-submit是否正在運行driver的應用程序或僅啟動集群應用程序。 // 在運行驅動程序時,JVM的參數將被修改以涵蓋驅動程序的配置。 Map<String, String> config = getEffectiveConfig(); boolean isClientMode = isClientMode(config); //默認如果standalone不匹配--deploy-mode cluster就是client,所以這個值是true // 這個DRIVER_EXTRA_CLASSPATH在client模式是不能直接在SparkConf中設置的,因為driver的JVM已經被Spark-submit通過反射啟動起來了 // 而是通過參數:--driver-class-path來設置的 // 這個DRIVER_EXTRA_CLASSPATH在client模式是不能直接在SparkConf中設置的, // 因為driver的JVM已經被Spark-submit通過反射啟動起來了,應該通過參數:--driver-class-path來設置的 String extraClassPath = isClientMode ? config.get(SparkLauncher.DRIVER_EXTRA_CLASSPATH) : null; List<String> cmd = buildJavaCommand(extraClassPath); // Take Thrift Server as daemon if (isThriftServer(mainClass)) { addOptionString(cmd, System.getenv("SPARK_DAEMON_JAVA_OPTS")); } //SPARK_SUBMIT_OPTS就是在spark-shell中提到的,需要將java的classpath手動設置到scala中 // SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS-Dscala.usejavacp=true" addOptionString(cmd, System.getenv("SPARK_SUBMIT_OPTS")); // We don't want the client to specify Xmx. These have to be set by their corresponding // memory flag --driver-memory or configuration entry spark.driver.memory String driverExtraJavaOptions = config.get(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS); if (!isEmpty(driverExtraJavaOptions) && driverExtraJavaOptions.contains("Xmx")) { String msg = String.format("Not allowed to specify max heap(Xmx) memory settings through " + "java options (was %s). Use the corresponding --driver-memory or " + "spark.driver.memory configuration instead.", driverExtraJavaOptions); throw new IllegalArgumentException(msg); } if (isClientMode) { // Figuring out where the memory value come from is a little tricky due to precedence. // Precedence is observed in the following order: // - explicit configuration (setConf()), which also covers --driver-memory cli argument. // - properties file. // - SPARK_DRIVER_MEMORY env variable // - SPARK_MEM env variable // - default value (1g) // Take Thrift Server as daemon String tsMemory = isThriftServer(mainClass) ? System.getenv("SPARK_DAEMON_MEMORY") : null; String memory = firstNonEmpty(tsMemory, config.get(SparkLauncher.DRIVER_MEMORY), System.getenv("SPARK_DRIVER_MEMORY"), System.getenv("SPARK_MEM"), DEFAULT_MEM); cmd.add("-Xmx" + memory);//最大、小堆內存默認是1g addOptionString(cmd, driverExtraJavaOptions); mergeEnvPathList(env, getLibPathEnvName(), config.get(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH)); } cmd.add("org.apache.spark.deploy.SparkSubmit"); //buildSparkSubmitArgs()返回list將上面spark-submit參數注入進來的參數及對應值取出來 cmd.addAll(buildSparkSubmitArgs()); return cmd; }
//org.apache.spark.launcher.SparkSubmitCommandBuilder$buildSparkSubmitArgs List<String> buildSparkSubmitArgs() { List<String> args = new ArrayList<>(); SparkSubmitOptionParser parser = new SparkSubmitOptionParser(); if (!allowsMixedArguments && isAppResourceReq) { checkArgument(appResource != null, "Missing application resource."); } if (verbose) { args.add(parser.VERBOSE); } if (master != null) { args.add(parser.MASTER); args.add(master); } if (deployMode != null) { args.add(parser.DEPLOY_MODE); args.add(deployMode); } if (appName != null) { args.add(parser.NAME); args.add(appName); } for (Map.Entry<String, String> e : conf.entrySet()) { args.add(parser.CONF); args.add(String.format("%s=%s", e.getKey(), e.getValue())); } if (propertiesFile != null) { args.add(parser.PROPERTIES_FILE); args.add(propertiesFile); } if (isExample) { jars.addAll(findExamplesJars()); } if (!jars.isEmpty()) { args.add(parser.JARS); args.add(join(",", jars)); } if (!files.isEmpty()) { args.add(parser.FILES); args.add(join(",", files)); } ...................................................
//CMD的命令如下 /usr/java/jdk1.7.0_79/bin/java -cp /opt/cloudera/parcels/SPARK2-2.0.0.cloudera.beta1-1.cdh5.7.0.p0.108015/lib/spark2/conf/:/opt/cloudera/parcels/SPARK2-2.0.0.cloudera.beta1-1.cdh5.7.0.p0.108015/lib/spark2/jars/*:/opt/cloudera/parcels/SPARK2-2.0.0.cloudera.beta1-1.cdh5.7.0.p0.108015/lib/spark2/conf/yarn-conf/ -Dscala.usejavacp=true -Xmx1g -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --class org.apache.spark.repl.Main --name Spark shell spark-shell //或者 /usr/java/jdk1.7.0_79/bin/java -cp /opt/cloudera/parcels/SPARK2-2.0.0.cloudera.beta1-1.cdh5.7.0.p0.108015/lib/spark2/conf/:/opt/cloudera/parcels/SPARK2-2.0.0.cloudera.beta1-1.cdh5.7.0.p0.108015/lib/spark2/jars/*:/etc/hadoop/:/etc/hadoop/conf.cloudera.yarn/ -XX:MaxPermSize=256m org.apache.spark.deploy.SparkSubmit --master yarn --deploy-mode cluster --conf spark.driver.extraClassPath=/opt/cloudera/parcels/CDH/lib/hbase/lib/* --conf spark.scheduler.mode=FAIR --conf spark.executorEnv.JAVA_HOME=/usr/java/jdk1.8 --conf spark.yarn.appMasterEnv.JAVA_HOME=/usr/java/jdk1.8 --conf spark.yarn.maxAppAttempts=1 --class opHbase.opHbase.TopHbase --name Hbase --verbose --files /etc/hadoop/conf/log4j.properties,/etc/hive/conf/hive-site.xml --jars hdfs://10.8.18.74:8020/ada/spark/share/tech_component/tc.plat.spark.jar,hdfs://10.8.18.74:8020/ada/spark/share/tech_component/bigdata4i-1.0.jar,hdfs://10.8.18.74:8020/ada/spark/share/tech_component/bigdata-sparklog-1.0.jar,hdfs://108474.server.bigdata.com.cn:8020/user/lyy/App/tc.app.test.opHbase-1.0.jar,hdfs://10.8.18.74:8020/ada/spark/share/tech_component/mysql-connector-java-5.1.24-bin.jar hdfs://108474.server.bigdata.com.cn:8020/user/lyy/App/opHbase.opHbase.jar loglevel=ALL path=hdfs://108474.server.bigdata.com.cn:8020/user/lyy/data/hfile hbtab=hbase_tes