有些工作只能在一台server上進行,比如master,這時HA(High Availability)首先要求部署多個server,其次要求多個server自動選舉出一個active狀態server,其他server處於standby狀態,只有active狀態的server允許進行特定的操作;當active狀態的server由於各種原因無法服務之后(比如掛了或者斷網),其他standby狀態的server中會馬上自動選舉出一個active的server來提供服務,實現服務的無縫切換;
hadoop master ha是通過zookeeper實現的,其中又分為hdfs ha(namenode的ha)和yarn ha(resourcemanager的ha),兩者既有共同點,又有差別;
1 現象
1.1 hdfs ha
zookeeper path:
/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb
/hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock
配置:
<property>
<name>ha.zookeeper.parent-znode</name>
<value>/hadoop-ha</value>
<description>
The ZooKeeper znode under which the ZK failover controller stores
its information. Note that the nameservice ID is automatically
appended to this znode, so it is not normally necessary to
configure this, even in a federated environment.
</description>
</property>
<property>
<name>dfs.nameservices</name>
<value></value>
<description>
Comma-separated list of nameservices.
</description>
</property>
1.2 yarn ha
zookeeper path:
/yarn-leader-election/${yarn.resourcemanager.cluster-id}/ActiveBreadCrumb
/yarn-leader-election/${yarn.resourcemanager.cluster-id}/ActiveStandbyElectorLock
配置:
<property>
<description>The base znode path to use for storing leader information,
when using ZooKeeper based leader election.</description>
<name>yarn.resourcemanager.ha.automatic-failover.zk-base-path</name>
<value>/yarn-leader-election</value>
</property>
<property>
<description>Name of the cluster. In a HA setting,
this is used to ensure the RM participates in leader
election for this cluster and ensures it does not affect
other clusters</description>
<name>yarn.resourcemanager.cluster-id</name>
<!--value>yarn-cluster</value-->
</property>
為什么zookeeper上有兩個節點ActiveBreadCrumb和ActiveStandbyElectorLock,ActiveStandbyElectorLock是用來實際加鎖的,ActiveBreadCrumb是用來做fence的;
2 代碼實現
hdfs和yarn的ha最終都用到了ActiveStandbyElector,逐一來看
2.1 hdfs ha
zkfc啟動命令
$HADOOP_HOME/bin/hdfs
elif [ "$COMMAND" = "zkfc" ] ; then
CLASS='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
HADOOP_OPTS="$HADOOP_OPTS $HADOOP_ZKFC_OPTS"
代碼
org.apache.hadoop.hdfs.tools.DFSZKFailoverController
public static void main(String args[]) throws Exception { if (DFSUtil.parseHelpArgument(args, ZKFailoverController.USAGE, System.out, true)) { System.exit(0); } GenericOptionsParser parser = new GenericOptionsParser( new HdfsConfiguration(), args); DFSZKFailoverController zkfc = DFSZKFailoverController.create( parser.getConfiguration()); int retCode = 0; try { retCode = zkfc.run(parser.getRemainingArgs()); } catch (Throwable t) { LOG.fatal("Got a fatal error, exiting now", t); } System.exit(retCode); }
DFSZKFailoverController.main會調用run,這里的run是父類ZKFailoverController的方法,其中會調用doRun,下面看父類:
org.apache.hadoop.ha.ZKFailoverController
private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode"; static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha"; private int doRun(String[] args) throws HadoopIllegalArgumentException, IOException, InterruptedException { try { initZK(); } catch (KeeperException ke) { LOG.fatal("Unable to start failover controller. Unable to connect " + "to ZooKeeper quorum at " + zkQuorum + ". Please check the " + "configured value for " + ZK_QUORUM_KEY + " and ensure that " + "ZooKeeper is running."); return ERR_CODE_NO_ZK; } if (args.length > 0) { if ("-formatZK".equals(args[0])) { boolean force = false; boolean interactive = true; for (int i = 1; i < args.length; i++) { if ("-force".equals(args[i])) { force = true; } else if ("-nonInteractive".equals(args[i])) { interactive = false; } else { badArg(args[i]); } } return formatZK(force, interactive); } else { badArg(args[0]); } } if (!elector.parentZNodeExists()) { LOG.fatal("Unable to start failover controller. " + "Parent znode does not exist.\n" + "Run with -formatZK flag to initialize ZooKeeper."); return ERR_CODE_NO_PARENT_ZNODE; } try { localTarget.checkFencingConfigured(); } catch (BadFencingConfigurationException e) { LOG.fatal("Fencing is not configured for " + localTarget + ".\n" + "You must configure a fencing method before using automatic " + "failover.", e); return ERR_CODE_NO_FENCER; } initRPC(); initHM(); startRPC(); try { mainLoop(); } finally { rpcServer.stopAndJoin(); elector.quitElection(true); healthMonitor.shutdown(); healthMonitor.join(); } return 0; } private void initZK() throws HadoopIllegalArgumentException, IOException, KeeperException { ... elector = new ActiveStandbyElector(zkQuorum, zkTimeout, getParentZnode(), zkAcls, zkAuths, new ElectorCallbacks(), maxRetryNum); } private String getParentZnode() { String znode = conf.get(ZK_PARENT_ZNODE_KEY, ZK_PARENT_ZNODE_DEFAULT); if (!znode.endsWith("/")) { znode += "/"; } return znode + getScopeInsideParentNode(); } class ElectorCallbacks implements ActiveStandbyElectorCallback { @Override public void becomeActive() throws ServiceFailedException { ZKFailoverController.this.becomeActive(); } @Override public void becomeStandby() { ZKFailoverController.this.becomeStandby(); } @Override public void enterNeutralMode() { } @Override public void notifyFatalError(String errorMessage) { fatalError(errorMessage); } @Override public void fenceOldActive(byte[] data) { ZKFailoverController.this.fenceOldActive(data); } @Override public String toString() { synchronized (ZKFailoverController.this) { return "Elector callbacks for " + localTarget; } } }
doRun中會調用幾個方法,最重要的兩個是initZK和initHM:
在initZK中會通過getParentZnode創建zk路徑,同時創建ActiveStandbyElector,這里最重要的是把內部類ElectorCallbacks的對象傳到ActiveStandbyElector,后續各種zk狀態都是通過ActiveStandbyElector->ElectorCallbacks->ZKFailoverController這個調用鏈傳遞的,最終實現狀態變更,比如becomeActive,becomeStandby等;
initZK中只是確定了zk路徑以及各種回調函數,還沒有實際的創建操作,具體的操作在initHM中,下面看initHM:
org.apache.hadoop.ha.ZKFailoverController
private void initHM() { healthMonitor = new HealthMonitor(conf, localTarget); healthMonitor.addCallback(new HealthCallbacks()); healthMonitor.addServiceStateCallback(new ServiceStateCallBacks()); healthMonitor.start(); }
initHM中會創建HealthMonitor,傳入HealthCallbacks,然后啟動HealthMonitor,下面看HealthMonitor:
org.apache.hadoop.ha.HealthMonitor
void start() { daemon.start(); } private void doHealthChecks() throws InterruptedException { while (shouldRun) { HAServiceStatus status = null; boolean healthy = false; try { status = proxy.getServiceStatus(); proxy.monitorHealth(); healthy = true; } catch (HealthCheckFailedException e) { LOG.warn("Service health check failed for " + targetToMonitor + ": " + e.getMessage()); enterState(State.SERVICE_UNHEALTHY); } catch (Throwable t) { LOG.warn("Transport-level exception trying to monitor health of " + targetToMonitor + ": " + t.getLocalizedMessage()); RPC.stopProxy(proxy); proxy = null; enterState(State.SERVICE_NOT_RESPONDING); Thread.sleep(sleepAfterDisconnectMillis); return; } if (status != null) { setLastServiceStatus(status); } if (healthy) { enterState(State.SERVICE_HEALTHY); } Thread.sleep(checkIntervalMillis); } } private synchronized void enterState(State newState) { if (newState != state) { LOG.info("Entering state " + newState); state = newState; synchronized (callbacks) { for (Callback cb : callbacks) { cb.enteredState(newState); } } } }
HealthMonitor.start會啟動內部的MonitorDaemon線程,而MonitorDaemon線程中中會循環調用HealthMonitor.doHealthChecks,doHealthChecks會根據各種狀態變化調用enterState,而enterState會迭代回調所有的callbacks,這是一個Observer模式,重點在callback上,即HealthCallbacks;
先看MonitorDaemon線程:
org.apache.hadoop.ha.HealthMonitor.MonitorDaemon
public void run() { while (shouldRun) { try { loopUntilConnected(); doHealthChecks(); } catch (InterruptedException ie) { Preconditions.checkState(!shouldRun, "Interrupted but still supposed to run"); } } }
再看HealthCallbacks:
org.apache.hadoop.ha.ZKFailoverController.HealthCallbacks
class HealthCallbacks implements HealthMonitor.Callback { @Override public void enteredState(HealthMonitor.State newState) { setLastHealthState(newState); recheckElectability(); } }
這里會調用到ZKFailoverController.recheckElectability
org.apache.hadoop.ha.ZKFailoverController
private void recheckElectability() { // Maintain lock ordering of elector -> ZKFC synchronized (elector) { synchronized (this) { boolean healthy = lastHealthState == State.SERVICE_HEALTHY; long remainingDelay = delayJoiningUntilNanotime - System.nanoTime(); if (remainingDelay > 0) { if (healthy) { LOG.info("Would have joined master election, but this node is " + "prohibited from doing so for " + TimeUnit.NANOSECONDS.toMillis(remainingDelay) + " more ms"); } scheduleRecheck(remainingDelay); return; } switch (lastHealthState) { case SERVICE_HEALTHY: elector.joinElection(targetToData(localTarget)); if (quitElectionOnBadState) { quitElectionOnBadState = false; } break; case INITIALIZING: LOG.info("Ensuring that " + localTarget + " does not " + "participate in active master election"); elector.quitElection(false); serviceState = HAServiceState.INITIALIZING; break; case SERVICE_UNHEALTHY: case SERVICE_NOT_RESPONDING: LOG.info("Quitting master election for " + localTarget + " and marking that fencing is necessary"); elector.quitElection(true); serviceState = HAServiceState.INITIALIZING; break; case HEALTH_MONITOR_FAILED: fatalError("Health monitor failed!"); break; default: throw new IllegalArgumentException("Unhandled state:" + lastHealthState); } } } }
在health的情況下會調用ActiveStandbyElector.joinElection,下面看ActiveStandbyElector:
org.apache.hadoop.ha.ActiveStandbyElector
public class ActiveStandbyElector implements StatCallback, StringCallback { public ActiveStandbyElector(String zookeeperHostPorts, int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl, List<ZKAuthInfo> authInfo, ActiveStandbyElectorCallback app, int maxRetryNum) throws IOException, HadoopIllegalArgumentException, KeeperException { ... znodeWorkingDir = parentZnodeName; zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME; zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME; ... public synchronized void joinElection(byte[] data) throws HadoopIllegalArgumentException { if (data == null) { throw new HadoopIllegalArgumentException("data cannot be null"); } if (wantToBeInElection) { LOG.info("Already in election. Not re-connecting."); return; } appData = new byte[data.length]; System.arraycopy(data, 0, appData, 0, data.length); LOG.debug("Attempting active election for " + this); joinElectionInternal(); } private void joinElectionInternal() { Preconditions.checkState(appData != null, "trying to join election without any app data"); if (zkClient == null) { if (!reEstablishSession()) { fatalError("Failed to reEstablish connection with ZooKeeper"); return; } } createRetryCount = 0; wantToBeInElection = true; createLockNodeAsync(); } private void createLockNodeAsync() { zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient); }
ActiveStandbyElector實現了兩個zookeeper的callback接口StatCallback和StringCallback,調用過程為joinElection->joinElectionInternal->createLockNodeAsync,最終會調用ZooKeeper.create異步方法,同時把自己作為callback傳進去,這樣zookeeper后續的變更都會回調ActiveStandbyElector.processResult,而processResult中會回調ElectorCallbacks,至此整個流程打通。
zookeeper的StringCallback接口如下:
org.apache.zookeeper.AsyncCallback.StringCallback
interface StringCallback extends AsyncCallback { public void processResult(int rc, String path, Object ctx, String name); }
2.2 yarn ha
org.apache.hadoop.yarn.conf.YarnConfiguration
public static final String AUTO_FAILOVER_ZK_BASE_PATH = AUTO_FAILOVER_PREFIX + "zk-base-path"; public static final String DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH = "/yarn-leader-election";
這里可以看到配置
org.apache.hadoop.yarn.server.resourcemanager.EmbeddedElectorService
protected void serviceInit(Configuration conf) throws Exception { ... String zkBasePath = conf.get(YarnConfiguration.AUTO_FAILOVER_ZK_BASE_PATH, YarnConfiguration.DEFAULT_AUTO_FAILOVER_ZK_BASE_PATH); String electionZNode = zkBasePath + "/" + clusterId; ... elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout, electionZNode, zkAcls, zkAuths, this, maxRetryNum); ... @Override protected void serviceStart() throws Exception { elector.joinElection(localActiveNodeInfo); super.serviceStart(); }
過程和上述ZKFailoverController差不多,EmbeddedElectorService.serviceInit中會創建zk路徑同時創建ActiveStandbyElector,然后在serviceStart中會調用ActiveStandbyElector.joinElection