海豚調度器(dolphinscheduler)的容錯,如圖:
容錯有兩種情況:一是啟動第一個master的時候,該master節點掃描processInstance表里面正在還在執行狀態
的processInstance重新生成command命令,同時將該條processInstance記錄的host置為null。
執行狀態是指:
private final int[] stateArray = new int[]{ExecutionStatus.SUBMITTED_SUCCESS.ordinal(),
ExecutionStatus.RUNNING_EXECUTION.ordinal(),
ExecutionStatus.READY_PAUSE.ordinal(),
ExecutionStatus.READY_STOP.ordinal()};
啟動時一個節點容錯的代碼:
// startup tolerant
if (getActiveMasterNum() == 1) {
removeZKNodePath(null, ZKNodeType.MASTER, true);
removeZKNodePath(null, ZKNodeType.WORKER, true);
}
第二種情況是ZK注冊了若干個節點,以ABC三個節點為例,C節點突然某個節點下線了,AB會受到通知,將C ip下面的處於上述列舉的執行狀態
的processInstance重新生成command命令,同時將該條processInstance記錄的host置為null,這個過程到底由AB誰來做呢?誰先搶到ZK的鎖誰做。
@Override
protected void dataChanged(CuratorFramework client, TreeCacheEvent event, String path) {
//monitor master
if (path.startsWith(getZNodeParentPath(ZKNodeType.MASTER) + Constants.SINGLE_SLASH)) {
handleMasterEvent(event, path);
} else if (path.startsWith(getZNodeParentPath(ZKNodeType.WORKER) + Constants.SINGLE_SLASH)) {
//monitor worker
handleWorkerEvent(event, path);
}
}
public void handleMasterEvent(TreeCacheEvent event, String path) {
switch (event.getType()) {
case NODE_ADDED:
logger.info("master node added : {}", path);
break;
case NODE_REMOVED:
removeZKNodePath(path, ZKNodeType.MASTER, true);
break;
default:
break;
}
}
/**
* remove zookeeper node path
*
* @param path zookeeper node path
* @param zkNodeType zookeeper node type
* @param failover is failover
*/
private void removeZKNodePath(String path, ZKNodeType zkNodeType, boolean failover) {
logger.info("{} node deleted : {}", zkNodeType, path);
InterProcessMutex mutex = null;
try {
String failoverPath = getFailoverLockPath(zkNodeType);
// create a distributed lock
mutex = new InterProcessMutex(getZkClient(), failoverPath);
mutex.acquire();
String serverHost = null;
if (StringUtils.isNotEmpty(path)) {
serverHost = getHostByEventDataPath(path);
if (StringUtils.isEmpty(serverHost)) {
logger.error("server down error: unknown path: {}", path);
return;
}
// handle dead server
handleDeadServer(path, zkNodeType, Constants.ADD_ZK_OP);
}
//failover server
if (failover) {
failoverServerWhenDown(serverHost, zkNodeType);
}
} catch (Exception e) {
logger.error("{} server failover failed.", zkNodeType);
logger.error("failover exception ", e);
} finally {
releaseMutex(mutex);
}
}