本章分析一下nacos集群之間nacos服務器上線,下線原理
- 每5秒運行定時任務ServerListManager.ServerListUpdater獲取新上線的節點或下線的節點
- 每2秒運行定時任務ServerListManager.ServerStatusReporter發送心跳數據到集群中的每個節點
- 每5秒運行定時任務ServerStatusManager.ServerStatusUpdater檢測和控制本地服務器的工作狀態
- 當前節點收到其他節點發送心跳數據后更新該節點最后心跳時間戳的Map對象
- 執行ServerStatusReporter時候運行checkDistroHeartbeat方法更新健康的服務器節點集合
Nacos集群代碼主要相關類:
// Nacos集群的成員節點
com.alibaba.nacos.naming.cluster.servers.Server;
// 用於全局刷新和操作服務器列表的管理器。
com.alibaba.nacos.naming.cluster.ServerListManager;
// 檢測和控制本地服務器的工作狀態
com.alibaba.nacos.naming.cluster.ServerStatusManager
//向其他服務器報告本地服務器狀態
com.alibaba.nacos.naming.misc.ServerStatusSynchronizer
- 啟動定時任務
/**
* The manager to globally refresh and operate server list.
*/
@Component("serverListManager")
public class ServerListManager {
//如果服務器節點有變化需要通知的對象
private List<ServerChangeListener> listeners = new ArrayList<>();
//存儲集群中所有服務器節點
private List<Server> servers = new ArrayList<>();
//存儲集群中健康的服務器節點
private List<Server> healthyServers = new ArrayList<>();
//根據site=key集群節點集合
private Map<String, List<Server>> distroConfig = new ConcurrentHashMap<>();
//存儲各個節點最后一次心跳時間戳
private Map<String, Long> distroBeats = new ConcurrentHashMap<>(16);
//服務啟動時候啟動兩個定時任務
@PostConstruct
public void init() {
//下面代碼相當於:executorService.scheduleAtFixedRate(new ServerListUpdater(), 0, NACOS_SERVER_LIST_REFRESH_INTERVAL=5000, TimeUnit.MILLISECONDS);
GlobalExecutor.registerServerListUpdater(new ServerListUpdater());
//下面代碼相當於:SERVER_STATUS_EXECUTOR.schedule(new ServerStatusReporter(), 2000, TimeUnit.MILLISECONDS);
GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
}
}
@Service
public class ServerStatusManager {
private ServerStatus serverStatus = ServerStatus.STARTING;
@PostConstruct
public void init() {
//executorService.scheduleAtFixedRate(runnable, 0, SERVER_STATUS_UPDATE_PERIOD=5000, TimeUnit.MILLISECONDS);
GlobalExecutor.registerServerStatusUpdater(new ServerStatusUpdater());
}
}
- ServerListManager.ServerListUpdater定時刷新本機集群節點列表的變化
/**
* ServerListManager的內部類,定時刷新本機集群節點列表的變化
*/
public class ServerListUpdater implements Runnable {
@Override
public void run() {
try {
/**
* refreshServerList方法:
* 1. 如果是STANDALONE_MODE返回當前節點實例
* 2. 從cluster.conf配置文件中讀取節點列表 (readClusterConf())返回
* 3. 如果為空則從System.getenv("naming_self_service_cluster_ips") 讀取節點列表返回
*/
List<Server> refreshedServers = refreshServerList();
List<Server> oldServers = servers;
if (CollectionUtils.isEmpty(refreshedServers)) {
Loggers.RAFT.warn("refresh server list failed, ignore it.");
return;
}
boolean changed = false;
//新增加集群節點
List<Server> newServers = (List<Server>) CollectionUtils.subtract(refreshedServers, oldServers);
if (CollectionUtils.isNotEmpty(newServers)) {
servers.addAll(newServers);
changed = true;
}
//移除的集群節點
List<Server> deadServers = (List<Server>) CollectionUtils.subtract(oldServers, refreshedServers);
if (CollectionUtils.isNotEmpty(deadServers)) {
servers.removeAll(deadServers);
changed = true;
}
//如果服務器節點有變化,通知其他類
if (changed) {
notifyListeners();
}
} catch (Exception e) {
Loggers.RAFT.info("error while updating server list.", e);
}
}
- ServerListManager.ServerStatusReporter定時發送心跳到集群中的其他節點
報文格式:SITE#IP:port#currentTimeMillis#weight\r\n
/**
* ServerListManager的內部類,定時發送心跳到集群中的其他節點
*/
private class ServerStatusReporter implements Runnable {
@Override
public void run() {
try {
//
checkDistroHeartbeat();
int weight = Runtime.getRuntime().availableProcessors() / 2;
if (weight <= 0) weight = 1;
long curTime = System.currentTimeMillis();
String status = LOCALHOST_SITE + "#" + NetUtils.localServer() + "#" + curTime + "#" + weight + "\r\n";
//send status to itself
onReceiveServerStatus(status);
//集群所有節點(除了本機)發送心跳
List<Server> allServers = getServers();
for (com.alibaba.nacos.naming.cluster.servers.Server server : allServers) {
if (server.getKey().equals(NetUtils.localServer())) {
continue;
}
Message msg = new Message();
msg.setData(status);
synchronizer.send(server.getKey(), msg);
}
} catch (Exception e) {
Loggers.SRV_LOG.error("[SERVER-STATUS] Exception while sending server status", e);
} finally {
GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());
}
}
}
}
/**
* Report local server status to other server
* @author nacos
*/
public class ServerStatusSynchronizer implements Synchronizer {
@Override
public void send(final String serverIP, Message msg) {
final Map<String, String> params = new HashMap<String, String>(2);
params.put("serverStatus", msg.getData());
//上報地址
String url = "http://serverIP:8848 + "/nacos/v1/ns/operator/server/status";
HttpClient.asyncHttpGet(url, null, params, new AsyncCompletionHandler() {
@Override
public Integer onCompleted(Response response) throws Exception {
if (response.getStatusCode() != HttpURLConnection.HTTP_OK) {
return 1;
}
return 0;
}
});
}
}
- 本地節點接收其他節點發送心跳后處理,主要代碼
@RequestMapping("/server/status")
public String serverStatus(@RequestParam String serverStatus) {
serverListManager.onReceiveServerStatus(serverStatus);
return "ok";
}
public synchronized void onReceiveServerStatus(String config) {
List<Server> tmpServerList = new ArrayList<>();
//site:ip:lastReportTime:weight
String[] params = config.split("#");
Server server = new Server();
server.setSite(params[0]);
server.setIp(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[0]);
server.setServePort(Integer.parseInt(params[1].split(UtilsAndCommons.IP_PORT_SPLITER)[1]));
server.setLastRefTime(Long.parseLong(params[2]));
//如果服務器的兩個報告間隔大於distroServerExpiredMillis,則認為服務器已過期。
Long lastBeat = distroBeats.get(server.getKey());
long now = System.currentTimeMillis();
if (null != lastBeat) {
server.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis());
}
//更新當前節點最后一次心跳時間
distroBeats.put(server.getKey(), now);
Date date = new Date(Long.parseLong(params[2]));
server.setLastRefTimeStr(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date));
server.setWeight(params.length == 4 ? Integer.parseInt(params[3]) : 1);
//如果當前節點在列表中更新為現在數據
List<Server> list = distroConfig.get(server.getSite());
for (Server s : list) {
String serverId = s.getKey() + "_" + s.getSite();
String newServerId = server.getKey() + "_" + server.getSite();
if (serverId.equals(newServerId)) {
tmpServerList.add(server);
continue;
}
tmpServerList.add(s);
}
//如果沒在列表中則添加到列表中
if (!tmpServerList.contains(server)) {
tmpServerList.add(server);
}
distroConfig.put(server.getSite(), tmpServerList);
}
- 檢查心跳時間戳確定節點是否Alive
private void checkDistroHeartbeat() {
List<Server> newHealthyList = new ArrayList<>(servers.size());
long now = System.currentTimeMillis();
for (Server s: servers) {
Long lastBeat = distroBeats.get(s.getKey());
if (null == lastBeat) {
continue;
}
s.setAlive(now - lastBeat < switchDomain.getDistroServerExpiredMillis());
}
//local site servers
List<String> allLocalSiteSrvs = new ArrayList<>();
for (Server server : servers) {
server.setAdWeight(switchDomain.getAdWeight(server.getKey()) == null ? 0 : switchDomain.getAdWeight(server.getKey()));
for (int i = 0; i < server.getWeight() + server.getAdWeight(); i++) {
if (!allLocalSiteSrvs.contains(server.getKey())) {
allLocalSiteSrvs.add(server.getKey());
}
if (server.isAlive() && !newHealthyList.contains(server)) {
newHealthyList.add(server);
}
}
}
Collections.sort(newHealthyList);
if (!CollectionUtils.isEqualCollection(healthyServers, newHealthyList)) {
healthyServers = newHealthyList;
notifyListeners();
}
}
- ServerStatusManager.ServerStatusUpdater 檢測和控制本地服務器的工作狀態
public class ServerStatusUpdater implements Runnable {
@Override
public void run() {
refreshServerStatus();
}
}
private void refreshServerStatus() {
if (StringUtils.isNotBlank(switchDomain.getOverriddenServerStatus())) {
serverStatus = ServerStatus.valueOf(switchDomain.getOverriddenServerStatus());
return;
}
if (consistencyService.isAvailable()) {
serverStatus = ServerStatus.UP;
} else {
serverStatus = ServerStatus.DOWN;
}
}