最近生产上的xxl_job框架的一个执行器(nacos客户端)因为分配内存不大,导致频繁与nacos服务端的连接断开,而断开之后虽然客户端服务没有宕掉,但是就是无法重新注册到nacos的服务端上去。
基于以上情况,我试着从nacos客户端注册与心跳检测方面跟一下源码。
1:首先打开nacos官网,查找心跳方面的介绍知识
https://nacos.io/zh-cn/docs/what-is-nacos.html,在open-api模块找到发送实例心跳接口
然后使用idea打开nacos源码(在github上下载),全局搜索/instance/beat,找到发送心跳的方法
1 public JSONObject sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {
2
3 if (NAMING_LOGGER.isDebugEnabled()) {
4 NAMING_LOGGER.debug("[BEAT] {} sending beat to server: {}", namespaceId, beatInfo.toString());
5 }
6 Map<String, String> params = new HashMap<String, String>(8);
7 String body = StringUtils.EMPTY;
8 if (!lightBeatEnabled) {
9 try {
10 body = "beat=" + URLEncoder.encode(JSON.toJSONString(beatInfo), "UTF-8");
11 } catch (UnsupportedEncodingException e) {
12 throw new NacosException(NacosException.SERVER_ERROR, "encode beatInfo error", e);
13 }
14 }
15 params.put(CommonParams.NAMESPACE_ID, namespaceId);
16 params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
17 params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
18 params.put("ip", beatInfo.getIp());
19 params.put("port", String.valueOf(beatInfo.getPort()));
20 String result = reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/beat", params, body, HttpMethod.PUT);
21 return JSON.parseObject(result);
22 }
然后在服务端找到nacos服务端接收到客户端心跳后操作逻辑如下:
1 @CanDistro
2 @PutMapping("/beat")
3 @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
4 public JSONObject beat(HttpServletRequest request) throws Exception {
5
6 JSONObject result = new JSONObject();
7
8 result.put("clientBeatInterval", switchDomain.getClientBeatInterval());
9 String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
10 String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID,
11 Constants.DEFAULT_NAMESPACE_ID);
12 String clusterName = WebUtils.optional(request, CommonParams.CLUSTER_NAME,
13 UtilsAndCommons.DEFAULT_CLUSTER_NAME);
14 String ip = WebUtils.optional(request, "ip", StringUtils.EMPTY);
15 int port = Integer.parseInt(WebUtils.optional(request, "port", "0"));
16 String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY);
17
18 RsInfo clientBeat = null;
19 if (StringUtils.isNotBlank(beat)) {
20 clientBeat = JSON.parseObject(beat, RsInfo.class);
21 }
22
23 if (clientBeat != null) {
24 if (StringUtils.isNotBlank(clientBeat.getCluster())) {
25 clusterName = clientBeat.getCluster();
26 } else {
27 // fix #2533
28 clientBeat.setCluster(clusterName);
29 }
30 ip = clientBeat.getIp();
31 port = clientBeat.getPort();
32 }
33
34 if (Loggers.SRV_LOG.isDebugEnabled()) {
35 Loggers.SRV_LOG.debug("[CLIENT-BEAT] full arguments: beat: {}, serviceName: {}", clientBeat, serviceName);
36 }
37
38 Instance instance = serviceManager.getInstance(namespaceId, serviceName, clusterName, ip, port);
39
40 if (instance == null) {
41 if (clientBeat == null) {
42 result.put(CommonParams.CODE, NamingResponseCode.RESOURCE_NOT_FOUND);
43 return result;
44 }
45 instance = new Instance();
46 instance.setPort(clientBeat.getPort());
47 instance.setIp(clientBeat.getIp());
48 instance.setWeight(clientBeat.getWeight());
49 instance.setMetadata(clientBeat.getMetadata());
50 instance.setClusterName(clusterName);
51 instance.setServiceName(serviceName);
52 instance.setInstanceId(instance.getInstanceId());
53 instance.setEphemeral(clientBeat.isEphemeral());
54
55 serviceManager.registerInstance(namespaceId, serviceName, instance);
56 }
57
58 Service service = serviceManager.getService(namespaceId, serviceName);
59
60 if (service == null) {
61 throw new NacosException(NacosException.SERVER_ERROR,
62 "service not found: " + serviceName + "@" + namespaceId);
63 }
64 if (clientBeat == null) {
65 clientBeat = new RsInfo();
66 clientBeat.setIp(ip);
67 clientBeat.setPort(port);
68 clientBeat.setCluster(clusterName);
69 }
70 service.processClientBeat(clientBeat);
71
72 result.put(CommonParams.CODE, NamingResponseCode.OK);
73 result.put("clientBeatInterval", instance.getInstanceHeartBeatInterval());
74 result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());
75 return result;
76 }
接下来,我们跟一下 service.processClientBeat(clientBeat)这个方法,看看里面的业务逻辑到底是怎样的?
public void processClientBeat(final RsInfo rsInfo) {
ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor();
clientBeatProcessor.setService(this);
clientBeatProcessor.setRsInfo(rsInfo);
HealthCheckReactor.scheduleNow(clientBeatProcessor);
}
再跟一下:
@Override
public void run() {
Service service = this.service;
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString());
}
String ip = rsInfo.getIp();
String clusterName = rsInfo.getCluster();
int port = rsInfo.getPort();
Cluster cluster = service.getClusterMap().get(clusterName);
List<Instance> instances = cluster.allIPs(true);
for (Instance instance : instances) {
if (instance.getIp().equals(ip) && instance.getPort() == port) {
if (Loggers.EVT_LOG.isDebugEnabled()) {
Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString());
}
instance.setLastBeat(System.currentTimeMillis());
if (!instance.isMarked()) {
if (!instance.isHealthy()) {
instance.setHealthy(true);
Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok",
cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE);
getPushService().serviceChanged(service);
}
}
}
}
}
到这里,我发现nacos服务端接收到客户端服务实例的信息后,主要是做了心跳的最新时间更新和健康状态更新,我去nacos相关的数据库表里面看了一下相关信息
看一下删除实例的逻辑:
@Override
public void run() {
try {
if (!getDistroMapper().responsible(service.getName())) {
return;
}
if (!getSwitchDomain().isHealthCheckEnabled()) {
return;
}
List<Instance> instances = service.allIPs(true);
// first set health status of instances:
for (Instance instance : instances) {
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) {
if (!instance.isMarked()) {
if (instance.isHealthy()) {
instance.setHealthy(false);
Loggers.EVT_LOG.info("{POS} {IP-DISABLED} valid: {}:{}@{}@{}, region: {}, msg: client timeout after {}, last beat: {}",
instance.getIp(), instance.getPort(), instance.getClusterName(), service.getName(),
UtilsAndCommons.LOCALHOST_SITE, instance.getInstanceHeartBeatTimeOut(), instance.getLastBeat());
getPushService().serviceChanged(service);
SpringContext.getAppContext().publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance));
}
}
}
}
if (!getGlobalConfig().isExpireInstance()) {
return;
}
// then remove obsolete instances:
for (Instance instance : instances) {
if (instance.isMarked()) {
continue;
}
if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) {
// delete instance
Loggers.SRV_LOG.info("[AUTO-DELETE-IP] service: {}, ip: {}", service.getName(), JSON.toJSONString(instance));
deleteIP(instance);
}
}
} catch (Exception e) {
Loggers.SRV_LOG.warn("Exception while processing client beat time out.", e);
}
}
