
#!/usr/bin/env bash ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ bin=`dirname "$0"` bin=`cd "$bin"; pwd` . "$bin"/config.sh # Start the JobManager instance(s) #比樣式時忽略大小寫 shopt -s nocasematch if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then # HA Mode readMasters echo "Starting HA cluster with ${#MASTERS[@]} masters." for ((i=0;i<${#MASTERS[@]};++i)); do master=${MASTERS[i]} webuiport=${WEBUIPORTS[i]} if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}" else ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &" fi done else echo "Starting cluster." # Start single JobManager on this machine "$FLINK_BIN_DIR"/jobmanager.sh start fi shopt -u nocasematch # Start TaskManager instance(s) TMSlaves start
其中絕大部分配置、環境變量相關工作,由. "$bin"/config.sh來完成,包括JAVA的環境變量,Flink的運行目錄等配置。還定一個一些工具方法,共Flink腳本來使用,比如解析主機地址、啟動、停止服務等。
if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then # HA Mode readMasters echo "Starting HA cluster with ${#MASTERS[@]} masters." for ((i=0;i<${#MASTERS[@]};++i)); do master=${MASTERS[i]} webuiport=${WEBUIPORTS[i]} if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}" else ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &" fi done else echo "Starting cluster." # Start single JobManager on this machine "$FLINK_BIN_DIR"/jobmanager.sh start fi
第一步啟動jobmanager,分兩種情況,
第一種情況是ha模式,高可用標示HIGH_AVAILABILITY邏輯在config.conf中實現,簡單理解就是在flink-conf.yaml文件中配置recovery.mode。字面理解其中的過程就是如果master都在本機上,
在不動的端口啟動jobmanager,如果Master不是都在本地啟動,那么ssh 通過nohup & 方式啟動jobmanager
第二種情況是非ha模型,直接在本機執行 jobmanager.sh start
第二步啟動TaskManager實例,TaskManager方法也在config.sh中定義,實現方式如下:
# starts or stops TMs on all slaves # TMSlaves start|stop TMSlaves() { CMD=$1 readSlaves if [ ${SLAVES_ALL_LOCALHOST} = true ] ; then # all-local setup for slave in ${SLAVES[@]}; do "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}" done else # non-local setup # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available command -v pdsh >/dev/null 2>&1 if [[ $? -ne 0 ]]; then for slave in ${SLAVES[@]}; do ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &" done else PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \ "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\"" fi fi }
內容比較簡單,通過config.sh中定義的readSlaves方法獲取Slaves信息,如果所有的slaves都是本機IP,那么直接在本地啟動taskmanager。如果有其他機器,在有pdsh工具的情況下用pdsh通過
nohup &命令調用taskmanager.sh方式啟動taskmanager,沒有pdsh工具的情況下,ssh到對應的機器上執行命令。