一、啟動腳本分析
獨立部署模式下,主要由master和slaves組成,master可以利用zk實現高可用性,其driver,work,app等信息可以持久化到zk上;slaves由一台至多台主機構成。Driver通過向Master申請資源獲取運行環境。
啟動master和slaves主要是執行/usr/dahua/spark/sbin目錄下的start-master.sh和start-slaves.sh,或者執行
start-all.sh,其中star-all.sh本質上就是調用start-master.sh和start-slaves.sh
1.1 start-all.sh
#1.判斷SPARK_HOME是否有值,沒有將其設置為當前文件所在目錄的上級目錄
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
#2.執行${SPARK_HOME}/sbin/spark-config.sh,見以下分析
. "${SPARK_HOME}/sbin/spark-config.sh"
#3.執行"${SPARK_HOME}/sbin"/start-master.sh,見以下分析
"${SPARK_HOME}/sbin"/start-master.sh
#4.執行"${SPARK_HOME}/sbin"/start-slaves.sh,見以下分析
"${SPARK_HOME}/sbin"/start-slaves.sh
其中start-master.sh和start-slave.sh分別調用的是
org.apache.spark.deploy.master.Master和org.apache.spark.deploy.worker.Worker
1.2 start-master.sh
start-master.sh調用了spark-daemon.sh,注意這里指定了啟動的類
#1.判斷SPARK_HOME是否有值,沒有將其設置為當前文件所在目錄的上級目錄
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
# NOTE: This exact class name is matched downstream by SparkSubmit.
# Any changes need to be reflected there.
#2.設置CLASS="org.apache.spark.deploy.master.Master"
CLASS="org.apache.spark.deploy.master.Master"
#3.如果參數結尾包含--help或者-h則打印幫助信息,並退出
if [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/start-master.sh [options]"
pattern="Usage:"
pattern+="\|Using Spark's default log4j profile:"
pattern+="\|Registered signal handlers for"
"${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
exit 1
fi
#4.設置ORIGINAL_ARGS為所有參數
ORIGINAL_ARGS="$@"
#5.執行${SPARK_HOME}/sbin/spark-config.sh
. "${SPARK_HOME}/sbin/spark-config.sh"
#6.執行${SPARK_HOME}/bin/load-spark-env.sh
. "${SPARK_HOME}/bin/load-spark-env.sh"
#7.SPARK_MASTER_PORT為空則賦值7077
if [ "$SPARK_MASTER_PORT" = "" ]; then
SPARK_MASTER_PORT=7077
fi
#8.SPARK_MASTER_HOST為空則賦值本主機名(hostname)
if [ "$SPARK_MASTER_HOST" = "" ]; then
case `uname` in
(SunOS)
SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`"
;;
(*)
SPARK_MASTER_HOST="`hostname -f`"
;;
esac
fi
#9.SPARK_MASTER_WEBUI_PORT為空則賦值8080
if [ "$SPARK_MASTER_WEBUI_PORT" = "" ]; then
SPARK_MASTER_WEBUI_PORT=8080
fi
#10.執行腳本
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS 1 \
--host $SPARK_MASTER_HOST --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
$ORIGINAL_ARGS
其中10肯定是重點,分析之前我們看看5,6都干了些啥,最后直譯出最后一個腳本
1.3 spark-config.sh(1.2的第5步)
#判斷SPARK_HOME是否有值,沒有將其設置為當前文件所在目錄的上級目錄
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
#SPARK_CONF_DIR存在就用此目錄,不存在用${SPARK_HOME}/conf
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}/conf"}"
# Add the PySpark classes to the PYTHONPATH:
if [ -z "${PYSPARK_PYTHONPATH_SET}" ]; then
export PYTHONPATH="${SPARK_HOME}/python:${PYTHONPATH}"
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.6-src.zip:${PYTHONPATH}"
export PYSPARK_PYTHONPATH_SET=1
fi
1.4 load-spark-env.sh(1.2的第6步)
#1.判斷SPARK_HOME是否有值,沒有將其設置為當前文件所在目錄的上級目錄
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
#2.判斷SPARK_ENV_LOADED是否有值,沒有將其設置為1
if [ -z "$SPARK_ENV_LOADED" ]; then
export SPARK_ENV_LOADED=1
#3.設置user_conf_dir為SPARK_CONF_DIR或SPARK_HOME/conf
export SPARK_CONF_DIR="${SPARK_CONF_DIR:-"${SPARK_HOME}"/conf}"
#4.執行"${user_conf_dir}/spark-env.sh" [注:set -/+a含義再做研究]
if [ -f "${SPARK_CONF_DIR}/spark-env.sh" ]; then
# Promote all variable declarations to environment (exported) variables
set -a
. "${SPARK_CONF_DIR}/spark-env.sh"
set +a
fi
fi
# Setting SPARK_SCALA_VERSION if not already set.
#5.選擇scala版本,2.11和2.12都存在的情況下,優先選擇2.11
if [ -z "$SPARK_SCALA_VERSION" ]; then
ASSEMBLY_DIR2="${SPARK_HOME}/assembly/target/scala-2.11"
ASSEMBLY_DIR1="${SPARK_HOME}/assembly/target/scala-2.12"
if [[ -d "$ASSEMBLY_DIR2" && -d "$ASSEMBLY_DIR1" ]]; then
echo -e "Presence of build for multiple Scala versions detected." 1>&2
echo -e 'Either clean one of them or, export SPARK_SCALA_VERSION in spark-env.sh.' 1>&2
exit 1
fi
if [ -d "$ASSEMBLY_DIR2" ]; then
export SPARK_SCALA_VERSION="2.11"
else
export SPARK_SCALA_VERSION="2.12"
fi
fi
1.5 spark-env.sh
列舉很多種模式的選項配置
1.6 spark-daemon.sh
回過頭來看看1.2第10步中需要直譯出的最后一個腳本,如下:
sbin/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 --host hostname --port 7077 --webui-port 8080
上面搞了半天只是設置了變量,最終才進入主角,繼續分析spark-daemon.sh腳本
#1.參數個數小於等於1,打印幫助
if [ $# -le 1 ]; then
echo $usage
exit 1
fi
#2.判斷SPARK_HOME是否有值,沒有將其設置為當前文件所在目錄的上級目錄
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
#3.執行${SPARK_HOME}/sbin/spark-config.sh,見上述分析 [類似腳本是否有重復?原因是有的人是直接用spark-daemon.sh啟動的服務,反正重復設置下變量不需要什么代價]
. "${SPARK_HOME}/sbin/spark-config.sh"
# get arguments
# Check if --config is passed as an argument. It is an optional parameter.
# Exit if the argument is not a directory.
#4.判斷第一個參數是否是--config,如果是取空格后一個字符串,然后判斷該目錄是否存在,不存在則打印錯誤信息並退出,存在設置SPARK_CONF_DIR為該目錄,shift到下一個參數
#[注:--config只能用在第一參數上]
if [ "$1" == "--config" ]
then
shift
conf_dir="$1"
if [ ! -d "$conf_dir" ]
then
echo "ERROR : $conf_dir is not a directory"
echo $usage
exit 1
else
export SPARK_CONF_DIR="$conf_dir"
fi
shift
fi
#5.分別設置option、command、instance為后面的三個參數(如:option=start,command=org.apache.spark.deploy.master.Master,instance=1)
#[注:很多人用spark-daemon.sh啟動服務不成功的原因是名字不全]
option=$1
shift
command=$1
shift
instance=$1
shift
#6.日志回滾函數,主要用於更改日志名,如log-->log.1等,略過
spark_rotate_log ()
{
log=$1;
num=5;
if [ -n "$2" ]; then
num=$2
fi
if [ -f "$log" ]; then # rotate logs
while [ $num -gt 1 ]; do
prev=`expr $num - 1`
[ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
num=$prev
done
mv "$log" "$log.$num";
fi
}
#7.執行${SPARK_HOME}/bin/load-spark-env.sh,見上述分析
. "${SPARK_HOME}/bin/load-spark-env.sh"
#8.判斷SPARK_IDENT_STRING是否有值,沒有將其設置為$USER(linux用戶)
if [ "$SPARK_IDENT_STRING" = "" ]; then
export SPARK_IDENT_STRING="$USER"
fi
#9.設置SPARK_PRINT_LAUNCH_COMMAND=1
export SPARK_PRINT_LAUNCH_COMMAND="1"
# get log directory
#10.判斷SPARK_LOG_DIR是否有值,沒有將其設置為${SPARK_HOME}/logs,並創建改目錄,測試創建文件,修改權限
if [ "$SPARK_LOG_DIR" = "" ]; then
export SPARK_LOG_DIR="${SPARK_HOME}/logs"
fi
mkdir -p "$SPARK_LOG_DIR"
touch "$SPARK_LOG_DIR"/.spark_test > /dev/null 2>&1
TEST_LOG_DIR=$?
if [ "${TEST_LOG_DIR}" = "0" ]; then
rm -f "$SPARK_LOG_DIR"/.spark_test
else
chown "$SPARK_IDENT_STRING" "$SPARK_LOG_DIR"
fi
#11.判斷SPARK_PID_DIR是否有值,沒有將其設置為/tmp
if [ "$SPARK_PID_DIR" = "" ]; then
SPARK_PID_DIR=/tmp
fi
# some variables
#12.設置log和pid
log="$SPARK_LOG_DIR/spark-$SPARK_IDENT_STRING-$command-$instance-$HOSTNAME.out"
pid="$SPARK_PID_DIR/spark-$SPARK_IDENT_STRING-$command-$instance.pid"
# Set default scheduling priority
#13.判斷SPARK_NICENESS是否有值,沒有將其設置為0 [注:調度優先級,見后面]
if [ "$SPARK_NICENESS" = "" ]; then
export SPARK_NICENESS=0
fi
#14.execute_command()函數,暫且略過,調用時再作分析
execute_command() {
if [ -z ${SPARK_NO_DAEMONIZE+set} ]; then
nohup -- "$@" >> $log 2>&1 < /dev/null &
newpid="$!"
echo "$newpid" > "$pid"
# Poll for up to 5 seconds for the java process to start
for i in {1..10}
do
if [[ $(ps -p "$newpid" -o comm=) =~ "java" ]]; then
break
fi
sleep 0.5
done
sleep 2
# Check if the process has died; in that case we'll tail the log so the user can see
if [[ ! $(ps -p "$newpid" -o comm=) =~ "java" ]]; then
echo "failed to launch: $@"
tail -10 "$log" | sed 's/^/ /'
echo "full log in $log"
fi
else
"$@"
fi
}
#15.進入case語句,判斷option值,進入該分支,我們以start為例
# 執行run_command class "$@",其中$@此時為空,經驗證,啟動帶上此參數后,關閉也需,不然關閉不了,后面再分析此參數作用
# 我們正式進入run_command()函數,分析
# I.設置mode=class,創建SPARK_PID_DIR,上面的pid文件是否存在,
# II.SPARK_MASTER不為空,同步刪除某些文件
# III.回滾log日志
# IV.進入case,command=org.apache.spark.deploy.master.Master,最終執行
# nohup nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class $command "$@" >> "$log" 2>&1 < /dev/null &
# newpid="$!"
# echo "$newpid" > "$pid"
# 重點轉向bin/spark-class org.apache.spark.deploy.master.Master
run_command() {
mode="$1"
shift
mkdir -p "$SPARK_PID_DIR"
if [ -f "$pid" ]; then
TARGET_ID="$(cat "$pid")"
if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
echo "$command running as process $TARGET_ID. Stop it first."
exit 1
fi
fi
if [ "$SPARK_MASTER" != "" ]; then
echo rsync from "$SPARK_MASTER"
rsync -a -e ssh --delete --exclude=.svn --exclude='logs/*' --exclude='contrib/hod/logs/*' "$SPARK_MASTER/" "${SPARK_HOME}"
fi
spark_rotate_log "$log"
echo "starting $command, logging to $log"
case "$mode" in
(class)
execute_command nice -n "$SPARK_NICENESS" "${SPARK_HOME}"/bin/spark-class "$command" "$@"
;;
(submit)
execute_command nice -n "$SPARK_NICENESS" bash "${SPARK_HOME}"/bin/spark-submit --class "$command" "$@"
;;
(*)
echo "unknown mode: $mode"
exit 1
;;
esac
}
case $option in
(submit)
run_command submit "$@"
;;
(start)
run_command class "$@"
;;
(stop)
if [ -f $pid ]; then
TARGET_ID="$(cat "$pid")"
if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
echo "stopping $command"
kill "$TARGET_ID" && rm -f "$pid"
else
echo "no $command to stop"
fi
else
echo "no $command to stop"
fi
;;
(status)
if [ -f $pid ]; then
TARGET_ID="$(cat "$pid")"
if [[ $(ps -p "$TARGET_ID" -o comm=) =~ "java" ]]; then
echo $command is running.
exit 0
else
echo $pid file is present but $command not running
exit 1
fi
else
echo $command not running.
exit 2
fi
;;
(*)
echo $usage
exit 1
;;
esac
1.7 spark-class
#1.判斷SPARK_HOME是否有值,沒有將其設置為當前文件所在目錄的上級目錄
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
#2.執行${SPARK_HOME}/bin/load-spark-env.sh,見上述分析
. "${SPARK_HOME}"/bin/load-spark-env.sh
# Find the java binary
#3.判斷JAVA_HOME是否為NULL,不是則設置RUNNER="${JAVA_HOME}/bin/java",否則找系統自帶,在沒有則報未設置,並退出
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# Find Spark jars.
#4.查找SPARK_JARS_DIR,若${SPARK_HOME}/RELEASE文件存在,則SPARK_JARS_DIR="${SPARK_HOME}/jars",否則
#SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
#5.若SPARK_JARS_DIR不存在且$SPARK_TESTING$SPARK_SQL_TESTING有值[注:一般我們不設置這兩變量],報錯退出,否則LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
# Add the launcher build dir to the classpath if requested.
#6.SPARK_PREPEND_CLASSES不是NULL,則LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH",
#添加編譯相關至LAUNCH_CLASSPATH
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
# For tests
#7.SPARK_TESTING不是NULL,則unset YARN_CONF_DIR和unset HADOOP_CONF_DIR,暫且當做是為了某種測試
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
#8.build_command函數,略過
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
# Turn off posix mode since it does not allow process substitution
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
#9.最終調用"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@",
#直譯:java -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
#轉向java類org.apache.spark.launcher.Main,這就是java入口類
done < <(build_command "$@")
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
fi
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
1.8 start-slaves.sh
#1.判斷SPARK_HOME是否有值,沒有將其設置為當前文件所在目錄的上級目錄
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
#2.執行${SPARK_HOME}/sbin/spark-config.sh,見上述分析
. "${SPARK_HOME}/sbin/spark-config.sh"
#3.執行${SPARK_HOME}/bin/load-spark-env.sh,見上述分析
. "${SPARK_HOME}/bin/load-spark-env.sh"
# Find the port number for the master
#4.SPARK_MASTER_PORT為空則設置為7077
if [ "$SPARK_MASTER_PORT" = "" ]; then
SPARK_MASTER_PORT=7077
fi
#5.SPARK_MASTER_HOST為空則設置為`hostname`
if [ "$SPARK_MASTER_HOST" = "" ]; then
case `uname` in
(SunOS)
SPARK_MASTER_HOST="`/usr/sbin/check-hostname | awk '{print $NF}'`"
;;
(*)
SPARK_MASTER_HOST="`hostname -f`"
;;
esac
fi
# Launch the slaves
#6.啟動slaves,
# "${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
# 遍歷conf/slaves中主機,其中有設置SPARK_SSH_OPTS,ssh每一台機器執行"${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
"${SPARK_HOME}/sbin/slaves.sh" cd "${SPARK_HOME}" \; "${SPARK_HOME}/sbin/start-slave.sh" "spark://$SPARK_MASTER_HOST:$SPARK_MASTER_PORT"
1.9 轉向start-slave.sh
#1.判斷SPARK_HOME是否有值,沒有將其設置為當前文件所在目錄的上級目錄
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
#2.設置CLASS="org.apache.spark.deploy.worker.Worker"
CLASS="org.apache.spark.deploy.worker.Worker"
#3.如果參數結尾包含--help或者-h則打印幫助信息,並退出
if [[ $# -lt 1 ]] || [[ "$@" = *--help ]] || [[ "$@" = *-h ]]; then
echo "Usage: ./sbin/start-slave.sh [options] <master>"
pattern="Usage:"
pattern+="\|Using Spark's default log4j profile:"
pattern+="\|Registered signal handlers for"
"${SPARK_HOME}"/bin/spark-class $CLASS --help 2>&1 | grep -v "$pattern" 1>&2
exit 1
fi
#4.執行${SPARK_HOME}/sbin/spark-config.sh,見上述分析
. "${SPARK_HOME}/sbin/spark-config.sh"
#5.執行${SPARK_HOME}/bin/load-spark-env.sh,見上述分析
. "${SPARK_HOME}/bin/load-spark-env.sh"
#6.MASTER=$1,這里MASTER=spark://hostname:7077,然后shift,也就是說單獨啟動單個slave使用start-slave.sh spark://hostname:7077
MASTER=$1
shift
#7.SPARK_WORKER_WEBUI_PORT為空則設置為8081
if [ "$SPARK_WORKER_WEBUI_PORT" = "" ]; then
SPARK_WORKER_WEBUI_PORT=8081
fi
#8.函數start_instance,略過
function start_instance {
#設置WORKER_NUM=$1
WORKER_NUM=$1
shift
if [ "$SPARK_WORKER_PORT" = "" ]; then
PORT_FLAG=
PORT_NUM=
else
PORT_FLAG="--port"
PORT_NUM=$(( $SPARK_WORKER_PORT + $WORKER_NUM - 1 ))
fi
WEBUI_PORT=$(( $SPARK_WORKER_WEBUI_PORT + $WORKER_NUM - 1 ))
#直譯:spark-daemon.sh start org.apache.spark.deploy.worker.Worker 1 --webui-port 7077 spark://hostname:7077
#代碼再次轉向spark-daemon.sh,見上訴分析
"${SPARK_HOME}/sbin"/spark-daemon.sh start $CLASS $WORKER_NUM \
--webui-port "$WEBUI_PORT" $PORT_FLAG $PORT_NUM $MASTER "$@"
}
#9.判斷SPARK_WORKER_INSTANCES(可以認為是單節點Worker進程數)是否為空
# 為空,則start_instance 1 "$@"
# 不為空,則循環
# for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
# start_instance $(( 1 + $i )) "$@"
# done
if [ "$SPARK_WORKER_INSTANCES" = "" ]; then
start_instance 1 "$@"
else
for ((i=0; i<$SPARK_WORKER_INSTANCES; i++)); do
#10.轉向start_instance函數
start_instance $(( 1 + $i )) "$@"
done
fi
二、其他腳本
2.1 start-history-server.sh
#1.判斷SPARK_HOME是否有值,沒有將其設置為當前文件所在目錄的上級目錄
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
#2.執行${SPARK_HOME}/sbin/spark-config.sh,見上述分析
. "${SPARK_HOME}/sbin/spark-config.sh"
#3.執行${SPARK_HOME}/bin/load-spark-env.sh,見上述分析
. "${SPARK_HOME}/bin/load-spark-env.sh"
#4.exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 $@ ,見上訴分析
exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.deploy.history.HistoryServer 1 "$@"
2.2 start-shuffle-service.sh
#1.判斷SPARK_HOME是否有值,沒有將其設置為當前文件所在目錄的上級目錄
if [ -z "${SPARK_HOME}" ]; then
export SPARK_HOME="$(cd "`dirname "$0"`"/..; pwd)"
fi
#2.執行${SPARK_HOME}/sbin/spark-config.sh,見上述分析
. "${SPARK_HOME}/sbin/spark-config.sh"
#3.執行${SPARK_HOME}/bin/load-spark-env.sh,見上述分析
. "${SPARK_HOME}/bin/load-spark-env.sh"
#4.exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.deploy.ExternalShuffleService 1 ,見上訴分析
exec "${SPARK_HOME}/sbin"/spark-daemon.sh start org.apache.spark.deploy.ExternalShuffleService 1
2.3 start-thriftserver.sh
開啟thriftserver,略
三、spark-submit處理邏輯分析
以上主要是介紹了spark啟動的一些腳本,這里主要分析一下Spark源碼中提交任務腳本的處理邏輯,從spark-submit一步步深入進去看看任務提交的整體流程,首先看一下整體的流程概要圖:
3.1 spark-submit
# -z是檢查后面變量是否為空(空則真) shell可以在雙引號之內引用變量,單引號不可
#這一步作用是檢查SPARK_HOME變量是否為空,為空則執行then后面程序
#source命令: source filename作用在當前bash環境下讀取並執行filename中的命令
#$0代表shell腳本文件本身的文件名,這里即使spark-submit
#dirname用於取得腳本文件所在目錄 dirname $0取得當前腳本文件所在目錄
#$(命令)表示返回該命令的結果
#故整個if語句的含義是:如果SPARK_HOME變量沒有設置值,則執行當前目錄下的find-spark-home腳本文件,設置SPARK_HOME值
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
#執行spark-class腳本,傳遞參數org.apache.spark.deploy.SparkSubmit 和"$@"
#這里$@表示之前spark-submit接收到的全部參數
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
所以spark-submit腳本的整體邏輯就是:
首先 檢查SPARK_HOME是否設置;if 已經設置 執行spark-class文件 否則加載執行find-spark-home文件
3.2 find-spark-home
#定義一個變量用於后續判斷是否存在定義SPARK_HOME的python腳本文件
FIND_SPARK_HOME_PYTHON_SCRIPT="$(cd "$(dirname "$0")"; pwd)/find_spark_home.py"
# Short cirtuit if the user already has this set.
##如果SPARK_HOME為不為空值,成功退出程序
if [ ! -z "${SPARK_HOME}" ]; then
exit 0
# -f用於判斷這個文件是否存在並且是否為常規文件,是的話為真,這里不存在為假,執行下面語句,給SPARK_HOME變量賦值
elif [ ! -f "$FIND_SPARK_HOME_PYTHON_SCRIPT" ]; then
# If we are not in the same directory as find_spark_home.py we are not pip installed so we don't
# need to search the different Python directories for a Spark installation.
# Note only that, if the user has pip installed PySpark but is directly calling pyspark-shell or
# spark-submit in another directory we want to use that version of PySpark rather than the
# pip installed version of PySpark.
export SPARK_HOME="$(cd "$(dirname "$0")"/..; pwd)"
else
# We are pip installed, use the Python script to resolve a reasonable SPARK_HOME
# Default to standard python interpreter unless told otherwise
if [[ -z "$PYSPARK_DRIVER_PYTHON" ]]; then
PYSPARK_DRIVER_PYTHON="${PYSPARK_PYTHON:-"python"}"
fi
export SPARK_HOME=$($PYSPARK_DRIVER_PYTHON "$FIND_SPARK_HOME_PYTHON_SCRIPT")
fi
可以看到,如果事先用戶沒有設定SPARK_HOME的值,這里程序也會自動設置並且將其注冊為環境變量,供后面程序使用
當SPARK_HOME的值設定完成之后,就會執行Spark-class文件,這也是我們分析的重要部分,源碼如下:
3.3 spark-class
#!/usr/bin/env bash
#依舊是檢查設置SPARK_HOME的值
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
#執行load-spark-env.sh腳本文件,主要目的在於加載設定一些變量值
#設定spark-env.sh中的變量值到環境變量中,供后續使用
#設定scala版本變量值
. "${SPARK_HOME}"/bin/load-spark-env.sh
# Find the java binary
#檢查設定java環境值
#-n代表檢測變量長度是否為0,不為0時候為真
#如果已經安裝Java沒有設置JAVA_HOME,command -v java返回的值為${JAVA_HOME}/bin/java
if [ -n "${JAVA_HOME}" ]; then
RUNNER="${JAVA_HOME}/bin/java"
else
if [ "$(command -v java)" ]; then
RUNNER="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# Find Spark jars.
#-d檢測文件是否為目錄,若為目錄則為真
#設置一些關聯Class文件
if [ -d "${SPARK_HOME}/jars" ]; then
SPARK_JARS_DIR="${SPARK_HOME}/jars"
else
SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
fi
if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
echo "You need to build Spark with the target \"package\" before running this program." 1>&2
exit 1
else
LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
fi
# Add the launcher build dir to the classpath if requested.
if [ -n "$SPARK_PREPEND_CLASSES" ]; then
LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
fi
# For tests
if [[ -n "$SPARK_TESTING" ]]; then
unset YARN_CONF_DIR
unset HADOOP_CONF_DIR
fi
# The launcher library will print arguments separated by a NULL character, to allow arguments with
# characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
# an array that will be used to exec the final command.
#
# The exit code of the launcher is appended to the output, so the parent shell removes it from the
# command array and checks the value to see if the launcher succeeded.
#執行類文件org.apache.spark.launcher.Main,返回解析后的參數
build_command() {
"$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
# Turn off posix mode since it does not allow process substitution
#將build_command方法解析后的參數賦給CMD
set +o posix
CMD=()
while IFS= read -d '' -r ARG; do
CMD+=("$ARG")
done < <(build_command "$@")
COUNT=${#CMD[@]}
LAST=$((COUNT - 1))
LAUNCHER_EXIT_CODE=${CMD[$LAST]}
# Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
# the code that parses the output of the launcher to get confused. In those cases, check if the
# exit code is an integer, and if it's not, handle it as a special error case.
if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
echo "${CMD[@]}" | head -n-1 1>&2
exit 1
fi
if [ $LAUNCHER_EXIT_CODE != 0 ]; then
exit $LAUNCHER_EXIT_CODE
fi
CMD=("${CMD[@]:0:$LAST}")
#執行CMD中的某個參數類org.apache.spark.deploy.SparkSubmit
exec "${CMD[@]}"
spark-class文件的執行邏輯稍顯復雜,總體上應該是這樣的:
檢查SPARK_HOME的值----》執行load-spark-env.sh文件,設定一些需要用到的環境變量,如scala環境值,這其中也加載了spark-env.sh文件-------》檢查設定java的執行路徑變量值-------》尋找spark jars,設定一些引用相關類的位置變量------》執行類文件org.apache.spark.launcher.Main,返回解析后的參數給CMD-------》判斷解析參數是否正確(代表了用戶設置的參數是否正確)--------》正確的話執行org.apache.spark.deploy.SparkSubmit這個類
3.4 SparkSubmit
2.1最后提交語句,D:\src\spark-2.3.0\core\src\main\scala\org\apache\spark\deploy\SparkSubmit.scala
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
override def main(args: Array[String]): Unit = {
// Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
// be reset before the application starts.
val uninitLog = initializeLogIfNecessary(true, silent = true)
//拿到submit腳本傳入的參數
val appArgs = new SparkSubmitArguments(args)
if (appArgs.verbose) {
// scalastyle:off println
printStream.println(appArgs)
// scalastyle:on println
}
//根據傳入的參數匹配對應的執行方法
appArgs.action match {
//根據傳入的參數提交命令
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
//只有standalone和mesos集群模式才觸發
case SparkSubmitAction.KILL => kill(appArgs)
//只有standalone和mesos集群模式才觸發
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
}
}
3.4.1 submit十分關鍵,主要分為兩步驟
(1)調用prepareSubmitEnvironment
(2)調用doRunMain

