執行start-dfs.sh腳本后,集群是如何啟動的?
本文閱讀並注釋了start-dfs腳本,以及namenode和datanode的啟動主要流程流程源碼。
閱讀源碼前准備
源碼獲取
-
拉取Apache Hadoop官方源碼
-
用idea打開...
-
切換到想看的版本...
這里用的最新版本3.3.1
閱讀目標
本篇的閱讀目標是搞明白hadoop中的start-dfs.sh啟動腳本執行后都做了什么,hadoop中的NameNode,DataNode啟動過程等大致流程,不會細追細節。
start-dfs.sh 干了什么
hdfs集群的啟動命令為:start-dfs.sh, 腳本的位置在下圖中:

腳本中大致分位兩塊內容,第一部分是調用hdfs-config.sh腳本配置hdfs以及hadoop的參數以及環境等,第二部分是啟動datanode、namenode以及secondary namenode等等。我們的重點是看第二部分的啟動流程。
hdfs-config 簡述
start-dfs.sh中啟動hdfs-config.sh的代碼如下:
# let's locate libexec...
if [[ -n "${HADOOP_HOME}" ]]; then
HADOOP_DEFAULT_LIBEXEC_DIR="${HADOOP_HOME}/libexec"
else
HADOOP_DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
fi
HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$HADOOP_DEFAULT_LIBEXEC_DIR}"
# shellcheck disable=SC2034
HADOOP_NEW_CONFIG=true
if [[ -f "${HADOOP_LIBEXEC_DIR}/hdfs-config.sh" ]]; then
. "${HADOOP_LIBEXEC_DIR}/hdfs-config.sh"
else
echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/hdfs-config.sh." 2>&1
exit 1
fi
在hdfs-config.sh腳本中會嘗試啟動hdfs-evn.sh腳本(如果存在)
之后會檢查以及設置HDFS的各種參數,例如:
# turn on the defaults
export HDFS_AUDIT_LOGGER=${HDFS_AUDIT_LOGGER:-INFO,NullAppender}
export HDFS_NAMENODE_OPTS=${HDFS_NAMENODE_OPTS:-"-Dhadoop.security.logger=INFO,RFAS"}
export HDFS_SECONDARYNAMENODE_OPTS=${HDFS_SECONDARYNAMENODE_OPTS:-"-Dhadoop.security.logger=INFO,RFAS"}
export HDFS_DATANODE_OPTS=${HDFS_DATANODE_OPTS:-"-Dhadoop.security.logger=ERROR,RFAS"}
export HDFS_PORTMAP_OPTS=${HDFS_PORTMAP_OPTS:-"-Xmx512m"}
# depending upon what is being used to start Java, these may need to be
# set empty. (thus no colon)
export HDFS_DATANODE_SECURE_EXTRA_OPTS=${HDFS_DATANODE_SECURE_EXTRA_OPTS-"-jvm server"}
export HDFS_NFS3_SECURE_EXTRA_OPTS=${HDFS_NFS3_SECURE_EXTRA_OPTS-"-jvm server"}
再之后會啟動hadoop-config.sh腳本:
# shellcheck source=./hadoop-common-project/hadoop-common/src/main/bin/hadoop-config.sh
if [[ -n "${HADOOP_COMMON_HOME}" ]] &&
[[ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]]; then
. "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh"
elif [[ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]]; then
. "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh"
elif [ -e "${HADOOP_HOME}/libexec/hadoop-config.sh" ]; then
. "${HADOOP_HOME}/libexec/hadoop-config.sh"
else
echo "ERROR: Hadoop common not found." 2>&1
exit 1
fi
hadoop-config.sh是最基本的、公用的環境變量配置腳本,會再調用etc/hadoop/hadoop-env.sh腳本。主要是配置java啟動項相關參數,比如java執行環境的classpath等。
hdfs-config.sh一系列腳本的整體功能就是保證啟動hdfs集群前對hdfs和hadoop的各種環境變量進行配置。
start-dfs.sh后續就是逐步啟動各個節點(namenodes,datanodes,secondary namenodes,quorumjournal nodes,quorumjournal nodes),如果是ha集群還會啟動ZK Failover controllers
NameNode 啟動流程
腳本代碼
start-dfs.sh中啟動namenode的代碼:
#---------------------------------------------------------
# namenodes
# 找到配置中(如果沒有配置取當前節點為)的namenode節點
NAMENODES=$("${HADOOP_HDFS_HOME}/bin/hdfs" getconf -namenodes 2>/dev/null)
if [[ -z "${NAMENODES}" ]]; then
NAMENODES=$(hostname)
fi
# 執行啟動命令
echo "Starting namenodes on [${NAMENODES}]"
hadoop_uservar_su hdfs namenode "${HADOOP_HDFS_HOME}/bin/hdfs" \
--workers \
--config "${HADOOP_CONF_DIR}" \
--hostnames "${NAMENODES}" \
--daemon start \
namenode ${nameStartOpt}
HADOOP_JUMBO_RETCOUNTER=$?
去hadoop-hdfs > src > mian > bin > hdfs中查看namenode命令:
# 命令描述:用於啟動namenode
hadoop_add_subcommand "namenode" daemon "run the DFS namenode"
# 命令處理程序
namenode)
HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.namenode.NameNode'
hadoop_add_param HADOOP_OPTS hdfs.audit.logger "-Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER}"
;;
# 執行函數命令
# everything is in globals at this point, so call the generic handler
hadoop_generic_java_subcmd_handler
這里就定位到了具體的處理類org.apache.hadoop.hdfs.server.namenode.NameNode。繼續跟進到hadoop_generic_java_subcmd_handler函數定義的地方-腳本hdfs:
## @description Handle subcommands from main program entries
## @audience private
## @stability evolving
## @replaceable yes
function hadoop_generic_java_subcmd_handler
{
# ...... 省略
# do the hard work of launching a daemon or just executing our interactive
# java class
if [[ "${HADOOP_SUBCMD_SUPPORTDAEMONIZATION}" = true ]]; then
if [[ "${HADOOP_SUBCMD_SECURESERVICE}" = true ]]; then
hadoop_secure_daemon_handler \
"${HADOOP_DAEMON_MODE}" \
"${HADOOP_SUBCMD}" \
"${HADOOP_SECURE_CLASSNAME}" \
"${daemon_pidfile}" \
"${daemon_outfile}" \
"${priv_pidfile}" \
"${priv_outfile}" \
"${priv_errfile}" \
"${HADOOP_SUBCMD_ARGS[@]}"
else
hadoop_daemon_handler \
"${HADOOP_DAEMON_MODE}" \
"${HADOOP_SUBCMD}" \
"${HADOOP_CLASSNAME}" \
"${daemon_pidfile}" \
"${daemon_outfile}" \
"${HADOOP_SUBCMD_ARGS[@]}"
fi
exit $?
else
hadoop_java_exec "${HADOOP_SUBCMD}" "${HADOOP_CLASSNAME}" "${HADOOP_SUBCMD_ARGS[@]}"
fi
}
function hadoop_java_exec
{
# run a java command. this is used for
# non-daemons
local command=$1
local class=$2
shift 2
hadoop_debug "Final CLASSPATH: ${CLASSPATH}"
hadoop_debug "Final HADOOP_OPTS: ${HADOOP_OPTS}"
hadoop_debug "Final JAVA_HOME: ${JAVA_HOME}"
hadoop_debug "java: ${JAVA}"
hadoop_debug "Class name: ${class}"
hadoop_debug "Command line options: $*"
export CLASSPATH
#shellcheck disable=SC2086
exec "${JAVA}" "-Dproc_${command}" ${HADOOP_OPTS} "${class}" "$@"
}
可以看到最終還是利用java命令來執行該類。
NameNode 源碼
在源碼中定位到org.apache.hadoop.hdfs.server.namenode.NameNode類。按照類的加載順序來看NameNode啟動流程:
靜態代碼塊
static{
HdfsConfiguration.init();
}
// 繼續跟進代碼,進入HdfsConfiguration類中:
@InterfaceAudience.Private
public class HdfsConfiguration extends Configuration {
static {
addDeprecatedKeys();
// 加載默認的配置文件
Configuration.addDefaultResource("hdfs-default.xml");
Configuration.addDefaultResource("hdfs-rbf-default.xml");
Configuration.addDefaultResource("hdfs-site.xml");
Configuration.addDefaultResource("hdfs-rbf-site.xml");
}
/**
* 這個方法在這里,這樣當調用HdfsConfiguration時,如果它之前還沒有被加載,它將被類加載。
* 在加載類時,將執行上面的靜態初始化塊來添加已棄用的鍵並添加默認資源。
* 這個方法被多次調用是安全的,因為靜態初始化器塊只會被調用一次。
* 這取代了以前其他類直接調用Configuration.addDefaultResource("hdfs-default.xml")而不首先加載該類的危險做法,從而跳過了棄用鍵。
*/
public static void init() {
}
mian方法
public static void main(String argv[]) throws Exception {
//分析傳入的參數,是否是幫助參數
if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
System.exit(0);
}
try {
//打印一些啟動日志信息
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
//創建namenode
NameNode namenode = createNameNode(argv, null);
if (namenode != null) {
//加入集群,HA,聯邦集群都是有多個NameNode
namenode.join();
}
} catch (Throwable e) {
LOG.error("Failed to start namenode.", e);
terminate(1, e);
}
}
需要關注的是NameNode namenode = createNameNode(argv, null);
public static NameNode createNameNode(String argv[], Configuration conf)
throws IOException {
// 日志信息
LOG.info("createNameNode " + Arrays.asList(argv));
if (conf == null)
// 准備配置文件對象
conf = new HdfsConfiguration();
// 將一些通用參數解析到配置中。
GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
argv = hParser.getRemainingArgs();
// 解析其余的NameNode特定參數,放到配置中。
StartupOption startOpt = parseArguments(argv);
if (startOpt == null) {
printUsage(System.err);
return null;
}
setStartupOption(conf, startOpt);
boolean aborted = false;
// 針對startup對象進行switch case 選擇
switch (startOpt) {
case FORMAT:
// 格式化
// 安裝hadoop后第一次啟動之前要執行的格式化命令 hadoop namenode -format
aborted = format(conf, startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid javac warning
case GENCLUSTERID:
String clusterID = NNStorage.newClusterID();
LOG.info("Generated new cluster id: {}", clusterID);
terminate(0);
return null;
case ROLLBACK:
aborted = doRollback(conf, true);
terminate(aborted ? 1 : 0);
return null; // avoid warning
case BOOTSTRAPSTANDBY:
String[] toolArgs = Arrays.copyOfRange(argv, 1, argv.length);
int rc = BootstrapStandby.run(toolArgs, conf);
terminate(rc);
return null; // avoid warning
case INITIALIZESHAREDEDITS:
aborted = initializeSharedEdits(conf,
startOpt.getForceFormat(),
startOpt.getInteractiveFormat());
terminate(aborted ? 1 : 0);
return null; // avoid warning
case BACKUP:
case CHECKPOINT:
NamenodeRole role = startOpt.toNodeRole();
DefaultMetricsSystem.initialize(role.toString().replace(" ", ""));
return new BackupNode(conf, role);
case RECOVER:
NameNode.doRecovery(startOpt, conf);
return null;
case METADATAVERSION:
printMetadataVersion(conf);
terminate(0);
return null; // avoid javac warning
case UPGRADEONLY:
DefaultMetricsSystem.initialize("NameNode");
new NameNode(conf);
terminate(0);
return null;
default:
// 正常啟動的時候就會走到這里
// metrics:度量系統記錄詳細運行信息
DefaultMetricsSystem.initialize("NameNode");
// 初始化NameNode
return new NameNode(conf);
}
}
NameNode構造方法
public NameNode(Configuration conf) throws IOException {
// 默認為正常的NameNode
this(conf, NamenodeRole.NAMENODE);
}
protected NameNode(Configuration conf, NamenodeRole role)
throws IOException {
// 將配置文件賦值到父類的靜態變量中
super(conf);
// 初始化Tracer
// 在“進程”中使用一個Tracer實例來收集和分發它的跟蹤范圍。Tracer實例是所有跟蹤的一站式商店。
this.tracer = new Tracer.Builder("NameNode").
conf(TraceUtils.wrapHadoopConf(NAMENODE_HTRACE_PREFIX, conf)).
build();
// TracerConfigurationManager類提供了通過RPC協議在運行時管理跟蹤器配置的函數。
this.tracerConfigurationManager =
new TracerConfigurationManager(NAMENODE_HTRACE_PREFIX, conf);
this.role = role;
// clientNamenodeAddress : 客戶端將用來訪問這個namenode或名稱服務的namenode地址。對於使用邏輯URI的HA配置,它將是邏輯地址。
String nsId = getNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId);
clientNamenodeAddress = NameNodeUtils.getClientNamenodeAddress(
conf, nsId);
// 虛擬機中搭建集群啟動日志打印為:
// 2021-07-07 17:45:46,560 INFO org.apache.hadoop.hdfs.server.namenode.NameNode: Clients are to use wanglj01:9000 to access this namenode/service.
if (clientNamenodeAddress != null) {
LOG.info("Clients should use {} to access"
+ " this namenode/service.", clientNamenodeAddress);
}
// ha集群相關
this.haEnabled = HAUtil.isHAEnabled(conf, nsId);
state = createHAState(getStartupOption(conf));
this.allowStaleStandbyReads = HAUtil.shouldAllowStandbyReads(conf);
this.haContext = createHAContext();
try {
// 給聯邦模式下准備的,主要是設置聯邦模式下namenode的地址和RPC地址
initializeGenericKeys(conf, nsId, namenodeId);
// 初始化namenode的核心方法
initialize(getConf());
// HA相關內容
state.prepareToEnterState(haContext);
try {
haContext.writeLock();
state.enterState(haContext);
} finally {
haContext.writeUnlock();
}
} catch (IOException e) {
this.stopAtException(e);
throw e;
} catch (HadoopIllegalArgumentException e) {
this.stopAtException(e);
throw e;
}
// 啟動成功
notBecomeActiveInSafemode = conf.getBoolean(
DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE,
DFS_HA_NN_NOT_BECOME_ACTIVE_IN_SAFEMODE_DEFAULT);
this.started.set(true);
}
NameNode#initialize
protected void initialize(Configuration conf) throws IOException {
if (conf.get(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS) == null) {
String intervals = conf.get(DFS_METRICS_PERCENTILES_INTERVALS_KEY);
if (intervals != null) {
conf.set(HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS,
intervals);
}
}
// UserGroupInformation類作用:
// Hadoop的用戶和組信息。該類封裝了一個JAAS Subject,並提供了確定用戶用戶名和組的方法。它同時支持Windows、Unix和Kerberos登錄模塊。
// 方法簡介:設置UGI的靜態配置。特別是設置安全身份驗證機制和組查找服務。
UserGroupInformation.setConfiguration(conf);
// 以NameNode配置的用戶登錄。
loginAsNameNodeUser(conf);
// 初始化namemode的度量系統
NameNode.initMetrics(conf, this.getRole());
StartupProgressMetrics.register(startupProgress);
// 初始化jvm監聽的度量系統
// JvmPauseMonitor類的作用:
// 此類建立一個簡單的線程。在此線程中,在循環中運行sleep一段時間方法,如果sleep花費的時間比傳遞給sleep方法的時間長,
// 就意味着JVM或者宿主機已經出現了停頓處理現象,可能會導致其它問題,如果這種停頓被監測出來,線程會打印一個消息。
pauseMonitor = new JvmPauseMonitor();
pauseMonitor.init(conf);
pauseMonitor.start();
metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);
if (conf.getBoolean(DFS_NAMENODE_GC_TIME_MONITOR_ENABLE,
DFS_NAMENODE_GC_TIME_MONITOR_ENABLE_DEFAULT)) {
long observationWindow = conf.getTimeDuration(
DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS,
DFS_NAMENODE_GC_TIME_MONITOR_OBSERVATION_WINDOW_MS_DEFAULT,
TimeUnit.MILLISECONDS);
long sleepInterval = conf.getTimeDuration(
DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS,
DFS_NAMENODE_GC_TIME_MONITOR_SLEEP_INTERVAL_MS_DEFAULT,
TimeUnit.MILLISECONDS);
gcTimeMonitor = new Builder().observationWindowMs(observationWindow)
.sleepIntervalMs(sleepInterval).build();
gcTimeMonitor.start();
metrics.getJvmMetrics().setGcTimeMonitor(gcTimeMonitor);
}
// 啟動httpserver
if (NamenodeRole.NAMENODE == role) {
startHttpServer(conf);
}
// 啟動nameNode時從磁盤加載fsimage以及edits文件,初始化FsNamesystem、FsDirectory、 LeaseManager等
loadNamesystem(conf);
startAliasMapServerIfNecessary(conf);
// 創建rpcserver,支持namenode與datanode,client進行通信的協議
// 封裝了NameNodeRpcServer clientRpcServer,支持 ClientNamenodeProtocol、DatanodeProtocolPB等協議
// 啥是rpc看這里:https://www.zhihu.com/question/25536695
rpcServer = createRpcServer(conf);
initReconfigurableBackoffKey();
if (clientNamenodeAddress == null) {
// This is expected for MiniDFSCluster. Set it now using
// the RPC server's bind address.
clientNamenodeAddress =
NetUtils.getHostPortString(getNameNodeAddress());
LOG.info("Clients are to use " + clientNamenodeAddress + " to access"
+ " this namenode/service.");
}
if (NamenodeRole.NAMENODE == role) {
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
if (levelDBAliasMapServer != null) {
httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());
}
}
// 啟動常用到主備狀態的服務
startCommonServices(conf);
// 啟動一個計時器,定期將NameNode度量寫入日志文件。可以通過配置禁用此行為。
startMetricsLogger(conf);
}
NameNode#startCommonServices
/** Start the services common to active and standby states */
private void startCommonServices(Configuration conf) throws IOException {
// 創建NameNodeResourceChecker、激活BlockManager等
namesystem.startCommonServices(conf, haContext);
registerNNSMXBean();
// 非NamenodeRole.NAMENODE的角色在此處啟動HttpServer
if (NamenodeRole.NAMENODE != role) {
startHttpServer(conf);
httpServer.setNameNodeAddress(getNameNodeAddress());
httpServer.setFSImage(getFSImage());
if (levelDBAliasMapServer != null) {
httpServer.setAliasMap(levelDBAliasMapServer.getAliasMap());
}
}
// 啟動RPCServer
rpcServer.start();
// 啟動各插件
try {
plugins = conf.getInstances(DFS_NAMENODE_PLUGINS_KEY,
ServicePlugin.class);
} catch (RuntimeException e) {
String pluginsValue = conf.get(DFS_NAMENODE_PLUGINS_KEY);
LOG.error("Unable to load NameNode plugins. Specified list of plugins: " +
pluginsValue, e);
throw e;
}
for (ServicePlugin p: plugins) {
try {
p.start(this);
} catch (Throwable t) {
LOG.warn("ServicePlugin " + p + " could not be started", t);
}
}
LOG.info(getRole() + " RPC up at: " + getNameNodeAddress());
if (rpcServer.getServiceRpcAddress() != null) {
LOG.info(getRole() + " service RPC up at: "
+ rpcServer.getServiceRpcAddress());
}
}
FSNamesystem#startCommonServices
方法用於啟動對主備狀態都通用的服務:
void startCommonServices(Configuration conf, HAContext haContext) throws IOException {
this.registerMBean(); // register the MBean for the FSNamesystemState
writeLock();
this.haContext = haContext;
try {
// 創建NameNodeResourceChecker(資源檢查線程),並立即檢查一次
nnResourceChecker = new NameNodeResourceChecker(conf);
checkAvailableResources();
assert !blockManager.isPopulatingReplQueues();
// 設置一些啟動過程中的信息
StartupProgress prog = NameNode.getStartupProgress();
prog.beginPhase(Phase.SAFEMODE);
long completeBlocksTotal = getCompleteBlocksTotal();
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
completeBlocksTotal);
// 激活blockManager: 保存與存儲在Hadoop集群中的塊相關的信息。
blockManager.activate(conf, completeBlocksTotal);
} finally {
writeUnlock("startCommonServices");
}
registerMXBean();
DefaultMetricsSystem.instance().register(this);
if (inodeAttributeProvider != null) {
inodeAttributeProvider.start();
dir.setINodeAttributeProvider(inodeAttributeProvider);
}
snapshotManager.registerMXBean();
InetSocketAddress serviceAddress = NameNode.getServiceAddress(conf, true);
this.nameNodeHostName = (serviceAddress != null) ?
serviceAddress.getHostName() : "";
}
BlockManager#activate
public void activate(Configuration conf, long blockTotal) {
// 啟動PendingReplicationBlocks,這個類主要是對數據塊進行一些記賬工作。類似於Block可能存放在那個Datanode上這種。
pendingReconstruction.start();
// 激活DatanodeManager:啟動DecommissionManager--Monitor、HeartbeatManager-- Monitor
datanodeManager.activate(conf);
// 啟動redundancyThread, 大致作用是:
// 計算可以在數據節點上調度的塊復制和塊失效工作。datanode將在下一個心跳時被告知這項工作
// 如果有任何重構請求超時,獲取它們並將它們放回需要的重構隊列中
// 重新掃描之前推遲的塊列表
this.redundancyThread.setName("RedundancyMonitor");
this.redundancyThread.start();
// 啟動storageInfoDefragmenterThread
// 它監視StorageInfo TreeSet的碎片,並在它低於某個閾值時壓縮它。
storageInfoDefragmenterThread.setName("StorageInfoMonitor");
storageInfoDefragmenterThread.start();
//塊匯報線程穹頂(心跳檢測機制)
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
// 初始化安全模式
bmSafeMode.activate(blockTotal);
}
namenode的主要責任是文件元信息與數據塊映射的管理。相應的,namenode的啟動流程需要關注
與客戶端、datanode通信的工作線程,文件元信息的管理機制,數據塊的管理機制等。其中,
RpcServer主要負責與客戶端、datanode通信,FSDirectory主要負責管理文件元信息。
