
#!/usr/bin/env bash ################################################################################ # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ bin=`dirname "$0"` bin=`cd "$bin"; pwd` . "$bin"/config.sh # Start the JobManager instance(s) #比样式时忽略大小写 shopt -s nocasematch if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then # HA Mode readMasters echo "Starting HA cluster with ${#MASTERS[@]} masters." for ((i=0;i<${#MASTERS[@]};++i)); do master=${MASTERS[i]} webuiport=${WEBUIPORTS[i]} if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}" else ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &" fi done else echo "Starting cluster." # Start single JobManager on this machine "$FLINK_BIN_DIR"/jobmanager.sh start fi shopt -u nocasematch # Start TaskManager instance(s) TMSlaves start
其中绝大部分配置、环境变量相关工作,由. "$bin"/config.sh来完成,包括JAVA的环境变量,Flink的运行目录等配置。还定一个一些工具方法,共Flink脚本来使用,比如解析主机地址、启动、停止服务等。
if [[ $HIGH_AVAILABILITY == "zookeeper" ]]; then # HA Mode readMasters echo "Starting HA cluster with ${#MASTERS[@]} masters." for ((i=0;i<${#MASTERS[@]};++i)); do master=${MASTERS[i]} webuiport=${WEBUIPORTS[i]} if [ ${MASTERS_ALL_LOCALHOST} = true ] ; then "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}" else ssh -n $FLINK_SSH_OPTS $master -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/jobmanager.sh\" start ${master} ${webuiport} &" fi done else echo "Starting cluster." # Start single JobManager on this machine "$FLINK_BIN_DIR"/jobmanager.sh start fi
第一步启动jobmanager,分两种情况,
第一种情况是ha模式,高可用标示HIGH_AVAILABILITY逻辑在config.conf中实现,简单理解就是在flink-conf.yaml文件中配置recovery.mode。字面理解其中的过程就是如果master都在本机上,
在不动的端口启动jobmanager,如果Master不是都在本地启动,那么ssh 通过nohup & 方式启动jobmanager
第二种情况是非ha模型,直接在本机执行 jobmanager.sh start
第二步启动TaskManager实例,TaskManager方法也在config.sh中定义,实现方式如下:
# starts or stops TMs on all slaves # TMSlaves start|stop TMSlaves() { CMD=$1 readSlaves if [ ${SLAVES_ALL_LOCALHOST} = true ] ; then # all-local setup for slave in ${SLAVES[@]}; do "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}" done else # non-local setup # Stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available command -v pdsh >/dev/null 2>&1 if [[ $? -ne 0 ]]; then for slave in ${SLAVES[@]}; do ssh -n $FLINK_SSH_OPTS $slave -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &" done else PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${SLAVES[*]}") \ "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\"" fi fi }
内容比较简单,通过config.sh中定义的readSlaves方法获取Slaves信息,如果所有的slaves都是本机IP,那么直接在本地启动taskmanager。如果有其他机器,在有pdsh工具的情况下用pdsh通过
nohup &命令调用taskmanager.sh方式启动taskmanager,没有pdsh工具的情况下,ssh到对应的机器上执行命令。