Spark學習之路 (十六)SparkCore的源碼解讀(二)spark-submit提交腳本


一、概述

上一篇主要是介紹了spark啟動的一些腳本,這篇主要分析一下Spark源碼中提交任務腳本的處理邏輯,從spark-submit一步步深入進去看看任務提交的整體流程,首先看一下整體的流程概要圖:

二、源碼解讀

2.1 spark-submit

# -z是檢查后面變量是否為空(空則真) shell可以在雙引號之內引用變量,單引號不可
#這一步作用是檢查SPARK_HOME變量是否為空,為空則執行then后面程序
#source命令: source filename作用在當前bash環境下讀取並執行filename中的命令
#$0代表shell腳本文件本身的文件名,這里即使spark-submit
#dirname用於取得腳本文件所在目錄 dirname $0取得當前腳本文件所在目錄
#$(命令)表示返回該命令的結果
#故整個if語句的含義是:如果SPARK_HOME變量沒有設置值,則執行當前目錄下的find-spark-home腳本文件,設置SPARK_HOME值
if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi

# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
#執行spark-class腳本,傳遞參數org.apache.spark.deploy.SparkSubmit 和"$@"
#這里$@表示之前spark-submit接收到的全部參數
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

所以spark-submit腳本的整體邏輯就是: 
首先 檢查SPARK_HOME是否設置;if 已經設置 執行spark-class文件 否則加載執行find-spark-home文件 

2.2 find-spark-home

#定義一個變量用於后續判斷是否存在定義SPARK_HOME的python腳本文件
FIND_SPARK_HOME_PYTHON_SCRIPT="$(cd "$(dirname "$0")"; pwd)/find_spark_home.py"

# Short cirtuit if the user already has this set.
##如果SPARK_HOME為不為空值,成功退出程序 if [ ! -z "${SPARK_HOME}" ]; then
   exit 0
# -f用於判斷這個文件是否存在並且是否為常規文件,是的話為真,這里不存在為假,執行下面語句,給SPARK_HOME變量賦值
elif [ ! -f "$FIND_SPARK_HOME_PYTHON_SCRIPT" ]; then
  # If we are not in the same directory as find_spark_home.py we are not pip installed so we don't
  # need to search the different Python directories for a Spark installation.
  # Note only that, if the user has pip installed PySpark but is directly calling pyspark-shell or
  # spark-submit in another directory we want to use that version of PySpark rather than the
  # pip installed version of PySpark.
  export SPARK_HOME="$(cd "$(dirname "$0")"/..; pwd)"
else
  # We are pip installed, use the Python script to resolve a reasonable SPARK_HOME
  # Default to standard python interpreter unless told otherwise
  if [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
     PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"python"}"
  fi
  export SPARK_HOME=$($PYSPARK_DRIVER_PYTHON "$FIND_SPARK_HOME_PYTHON_SCRIPT")
fi

 

可以看到,如果事先用戶沒有設定SPARK_HOME的值,這里程序也會自動設置並且將其注冊為環境變量,供后面程序使用

當SPARK_HOME的值設定完成之后,就會執行Spark-class文件,這也是我們分析的重要部分,源碼如下:

2.3 spark-class

#!/usr/bin/env bash
#依舊是檢查設置SPARK_HOME的值 if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi
#執行load-spark-env.sh腳本文件,主要目的在於加載設定一些變量值
#設定spark-env.sh中的變量值到環境變量中,供后續使用 #設定scala版本變量值
. "${SPARK_HOME}"/bin/load-spark-env.sh

# Find the java binary
#檢查設定java環境值 #-n代表檢測變量長度是否為0,不為0時候為真
#如果已經安裝Java沒有設置JAVA_HOME,command -v java返回的值為${JAVA_HOME}/bin/java
if [ -n "${JAVA_HOME}" ]; then
  RUNNER="${JAVA_HOME}/bin/java"
else
  if [ "$(command -v java)" ]; then
    RUNNER="java"
  else
    echo "JAVA_HOME is not set" >&2
    exit 1
  fi
fi

# Find Spark jars.
#-d檢測文件是否為目錄,若為目錄則為真
#設置一些關聯Class文件
if [ -d "${SPARK_HOME}/jars" ]; then
  SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
  SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi

if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
  echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
  echo "You need to build Spark with the target \"package\" before running this program." 1>&2
  exit 1
else
  LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi

# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
  LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi

# For tests
if [[ -n "$SPARK_TESTING" ]]; then
  unset YARN_CONF_DIR
  unset HADOOP_CONF_DIR
fi

# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
#執行類文件org.apache.spark.launcher.Main,返回解析后的參數
build_command() {
  "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

# Turn off posix mode since it does not allow process substitution
#將build_command方法解析后的參數賦給CMD set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
  CMD+=("$ARG")
done < <(build_command "$@")

COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}

# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
  echo "${CMD[@]}" | head -n-1 1>&2
  exit 1
fi

if [ $LAUNCHER_EXIT_CODE != 0 ]; then
  exit $LAUNCHER_EXIT_CODE
fi

CMD=("${CMD[@]:0:$LAST}")
#執行CMD中的某個參數類org.apache.spark.deploy.SparkSubmit
exec "${CMD[@]}"

 

spark-class文件的執行邏輯稍顯復雜,總體上應該是這樣的:

檢查SPARK_HOME的值----》執行load-spark-env.sh文件,設定一些需要用到的環境變量,如scala環境值,這其中也加載了spark-env.sh文件-------》檢查設定java的執行路徑變量值-------》尋找spark jars,設定一些引用相關類的位置變量------》執行類文件org.apache.spark.launcher.Main,返回解析后的參數給CMD-------》判斷解析參數是否正確(代表了用戶設置的參數是否正確)--------》正確的話執行org.apache.spark.deploy.SparkSubmit這個類

2.4 SparkSubmit 

2.1最后提交語句,D:\src\spark-2.3.0\core\src\main\scala\org\apache\spark\deploy\SparkSubmit.scala

exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"

 

override def main(args: Array[String]): Unit = {
    // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
    // be reset before the application starts.
    val uninitLog = initializeLogIfNecessary(true, silent = true)
    //拿到submit腳本傳入的參數
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) {
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    }
    //根據傳入的參數匹配對應的執行方法
    appArgs.action match {
        //根據傳入的參數提交命令
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
        //只有standalone和mesos集群模式才觸發
      case SparkSubmitAction.KILL => kill(appArgs)
      //只有standalone和mesos集群模式才觸發
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    }
  }

 

2.4.1 submit十分關鍵,主要分為兩步驟

(1)調用prepareSubmitEnvironment

(2)調用doRunMain

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM