上面这张图我在写eureka时就画过,这里拿来用下,因为学习同类型东西就是要学会找相同点和不同点,其实Eureka和nacos的注册和消费流程都是一样的,不同点其实就两块,第一块Eureka在数据同步时没有选举的机制,第二点在数据同步时通知客户端的方式不同;
Nacos服务注册需要具备的能力
- 服务提供者把自己的协议地址注册到Nacos server
- 服务消费者需要从Nacos Server上去查询服务提供者的地址(根据服务名称)
- Nacos Server需要感知到服务提供者的上下线的变化
- 服务消费者需要动态感知到Nacos Server端服务地址的变化
服务注册
springboot和nacos的结合
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> <dependency> <groupId>com.alibaba.boot</groupId> <artifactId>nacos-config-spring-boot-starter</artifactId> <version>0.2.7</version> </dependency> <dependency> <groupId>com.alibaba.boot</groupId> <artifactId>nacos-discovery-spring-boot-starter</artifactId> <version>0.2.7</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
application.properties
# 应用名称 spring.application.name=springboot-nacos management.endpoints.jmx.exposure.include=* management.endpoints.web.exposure.include=* management.endpoint.health.show-details=always alibaba.cloud.access-key=**** alibaba.cloud.secret-key=**** nacos.discovery.server-addr=localhost:8848
服务注册和查询的代码
@RestController public class TestController { @NacosValue ( value = "${info:默认值}",autoRefreshed = true) private String info; @NacosInjected private NamingService namingService; //服务注册功能的Service //查询注册信息 @GetMapping("/get") public String get() throws NacosException { // return info; return namingService.getAllInstances ( "SpringBoot-Nacos" ).toString (); } //注册的代码 @PostMapping("/registry") public String registry() throws NacosException { //通过Instance注册服务 Instance instance=new Instance (); instance.setClusterName("TestCluster"); //集群名称 instance.setEnabled(true); //是否启用 instance.setEphemeral(true); //临时节点/持久化节点, CP(Raft), AP(Distro) instance.setIp("localhost"); instance.setPort(8848); instance.setWeight(10); //1~100 namingService.registerInstance("SpringBoot-Nacos",instance); return "SUCCESS"; } }
nacos的实现原理
Nacos的源码分析之服务注册的流程
Dubbo服务注册的流程有两个,一个是和之前分析Eureka源码时的路径一样(参考Eureka源码分析)另一个是基于Dubbo本身提供的自动装配机制,而在基于Dubbo服务发布的过程中,是走的事件监听机制,在DubboServiceRegistrationNonWebApplicationAutoConfiguration这个类中,这个类会监听ApplicationStartedEvent事件,这个时间是spring boot在2.0新增的,就是当spring boot应用启动完成之后会发布这个时间。而此时监听到这个事件之后,会触发注册的动作。
//springboot启动完成时会发布@EventListener({ApplicationStartedEvent.class})
@EventListener({ApplicationStartedEvent.class}) public void onApplicationStarted() { this.setServerPort(); this.register(); } private void register() { if (!this.registered) { this.serviceRegistry.register(this.registration); this.registered = true; } }
this.serviceRegistry。 是一个注入的实例: NacosServiceRegistry
NacosServiceRegistry.register
serviceId , 对应当前应用的application.name
group,表示nacos上的分组配置
instance,表示服务实例信息
public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); } else { String serviceId = registration.getServiceId(); String group = this.nacosDiscoveryProperties.getGroup(); Instance instance = this.getNacosInstanceFromRegistration(registration); try { this.namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()}); } catch (Exception var6) { log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var6}); ReflectionUtils.rethrowRuntimeException(var6); } } }
NacosNamingService.registerInstance
开始注册实例,主要做两个动作
- 如果当前注册的是临时节点,则构建心跳信息,通过beat反应堆来构建心跳任务
- 调用registerService发起服务注册
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { //是否是临时节点,如果是临时节点,则构建心跳信息 BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
//beatReactor,添加心跳信息进行处理 this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
registerService
- 代码逻辑很简单,构建请求参数
- 发起请求
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance}); Map<String, String> params = new HashMap(9); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); params.put("clusterName", instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, "POST"); }
reqAPI
发起服务注册请求。
- api: nacos server open api
- params: 请求参数
- body:
- method: 请求方法类型
- servers: nacos server地址
public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) { throw new NacosException(NacosException.INVALID_PARAM, "no server available"); } NacosException exception = new NacosException(); //如果服务地址不为空 if (servers != null && !servers.isEmpty()) { //随机获取一台服务器节点 Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); //遍历服务列表 for (int i = 0; i < servers.size(); i++) { String server = servers.get(index); //获取索引位置的服务节点 try {
//调用指定服务 return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", server, e); } }
//轮询 index = (index + 1) % servers.size(); } } if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", nacosDomain, e); } } } } NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg()); throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); }
callServer
发起服务调用
public String callServer(String api, Map<String, String> params, String body, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0;
//添加签名信息 injectSecurityInfo(params);
//添加头信息 List<String> headers = builderHeaders(); String url; //拼接url地址 if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) { url = curServer + api; } else { if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) { curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort; } url = HttpClient.getPrefix() + curServer + api; } //通过httpclient发起请求 HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)) .observe(end - start); if (HttpURLConnection.HTTP_OK == result.code) { //返回服务端的结果 return result.content; } if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) { return StringUtils.EMPTY; } throw new NacosException(result.code, result.content); }
request
发起请求,获得结果
public static HttpResult request(String url, List<String> headers, Map<String, String> paramValues, String body, String encoding, String method) { HttpURLConnection conn = null; try { String encodedContent = encodingParams(paramValues, encoding); url += (StringUtils.isEmpty(encodedContent)) ? "" : ("?" + encodedContent); conn = (HttpURLConnection) new URL(url).openConnection(); setHeaders(conn, headers, encoding); conn.setConnectTimeout(CON_TIME_OUT_MILLIS); conn.setReadTimeout(TIME_OUT_MILLIS); conn.setRequestMethod(method); conn.setDoOutput(true); if (StringUtils.isNotBlank(body)) { byte[] b = body.getBytes(); conn.setRequestProperty("Content-Length", String.valueOf(b.length)); conn.getOutputStream().write(b, 0, b.length); conn.getOutputStream().flush(); conn.getOutputStream().close(); } conn.connect(); if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("Request from server: " + url); } return getResult(conn); } catch (Exception e) { try { if (conn != null) { NAMING_LOGGER.warn("failed to request " + conn.getURL() + " from " + InetAddress.getByName(conn.getURL().getHost()).getHostAddress()); } } catch (Exception e1) { NAMING_LOGGER.error("[NA] failed to request ", e1); //ignore } NAMING_LOGGER.error("[NA] failed to request ", e); return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap()); } finally { IoUtils.closeQuietly(conn); } }
Nacos服务端的处理
服务端提供了一个InstanceController类,在这个类中提供了服务注册相关的API,而服务端发起初测时,调用的接口是: [post]: /nacos/v1/ns/instance
- serviceName: 代表客户端的项目名称
- namespace: nacos 的namespace
@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); //从请求中解析出instance 实例 final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; } }
ServiceManager.registerInstance
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //创建一个空服务,在Nacos控制台服务列表展示的服务信息,实际上是初始化一个serviceMap,它 是一个ConcurrentHashMap集合 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //从serviceMap中,根据namespaceId和serviceName得到一个服务对象 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } //调用addInstance创建一个服务实例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
createServiceIfAbsent
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { //从serviceMap中获取服务对象 Service service = getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) {//如果为空。则初始化 cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } }
putServiceAndInit
private void putServiceAndInit(Service service) throws NacosException { putService(service);//把服务信息保存到serviceMap集合 service = getService(service.getNamespaceId(), service.getName()); service.init();//建立心跳检测机制 //实现数据一致性监听,ephemeral=true表示采用raft协议,false表示采用Distro consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
addInstance
向上返回,然后进入addInstance方法,把服务实例添加到集合中,然后基于一致性协议进行数据的同步。
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } }
消费者的服务查询
服务注册成功之后,消费者就可以从nacos server中获取到服务提供者的地址,然后进行服务的调用。在服务消费中,有一个核心的类 NacosDiscoveryClient 来负责和nacos交互,去获得服务提供者的地址信息。前置的具体的流程就不在这里复述了,之前在讲dubbo源码的时候已经分析过服务的订阅过程。NacosDiscoveryClient 中提供了一个 getInstances 方法用来根据服务提供者名称获取服务提供者的url地址的方法.
客户端启动获取服务列表
NacosDiscoveryClient.getInstances
public class NacosDiscoveryClient implements DiscoveryClient { private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class); public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client"; private NacosServiceDiscovery serviceDiscovery; public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) { this.serviceDiscovery = nacosServiceDiscovery; } public String description() { return "Spring Cloud Nacos Discovery Client"; } public List<ServiceInstance> getInstances(String serviceId) { try { return this.serviceDiscovery.getInstances(serviceId); } catch (Exception var3) { throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, var3); } } }
调用NamingService,根据serviceId、group获得服务实例列表。然后把instance转化为ServiceInstance对象
public List<ServiceInstance> getInstances(String serviceId) throws NacosException { String group = this.discoveryProperties.getGroup(); List<Instance> instances = this.discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true); return hostToServiceInstanceList(instances, serviceId); }
NacosNamingService.selectInstances
selectInstances首先从hostReactor获取serviceInfo,然后再从serviceInfo.getHosts()剔除非healty、非enabled、weight小于等于0的instance再返回;如果subscribe为true,则执行hostReactor.getServiceInfo获取serviceInfo,否则执行hostReactor.getServiceInfoDirectlyFromServer获取serviceInfo
@Override public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; if (subscribe) {//是否订阅服务地址的变化,默认为true serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } return selectInstances(serviceInfo, healthy); }
HostReactor.getServiceInfo
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); //拼接服务名称+集群名称(默认为空) String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } //从ServiceInfoMap中根据key来查找服务提供者列表,ServiceInfoMap是客户端的服务地址的本 地缓存 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) {//如果为空,表示本地缓存不存在 //如果找不到则创建一个新的然后放入serviceInfoMap,同时放入updatingMap,执行 updateServiceNow,再从updatingMap移除; serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { //如果从serviceInfoMap找出来的serviceObj在updatingMap中则等待UPDATE_HOLD_INTERVAL; if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } //如果本地缓存中存在,则通过scheduleUpdateIfAbsent开启定时任务,再从serviceInfoMap取出 serviceInfo scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); }
上述代码中,有两个逻辑,分别是
- updateServiceNow, 立马从Nacos server中去加载服务地址信息
- scheduleUpdateIfAbsent 开启定时调度,每1s去查询一次服务地址
updateServiceNow
public void updateServiceNow(String serviceName, String clusters) { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false); if (StringUtils.isNotEmpty(result)) { processServiceJSON(result); } } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } }
queryList
调用 /nacos/v1/ns/instance/list ,从Nacos server端获取服务地址信息。
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET); }
springbppt集成nacos:
https://gitee.com/TongHuaShuShuoWoDeJieJu/springboot-nacos.git