上面這張圖我在寫eureka時就畫過,這里拿來用下,因為學習同類型東西就是要學會找相同點和不同點,其實Eureka和nacos的注冊和消費流程都是一樣的,不同點其實就兩塊,第一塊Eureka在數據同步時沒有選舉的機制,第二點在數據同步時通知客戶端的方式不同;
Nacos服務注冊需要具備的能力
- 服務提供者把自己的協議地址注冊到Nacos server
- 服務消費者需要從Nacos Server上去查詢服務提供者的地址(根據服務名稱)
- Nacos Server需要感知到服務提供者的上下線的變化
- 服務消費者需要動態感知到Nacos Server端服務地址的變化
服務注冊
springboot和nacos的結合
<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId> </dependency> <dependency> <groupId>com.alibaba.boot</groupId> <artifactId>nacos-config-spring-boot-starter</artifactId> <version>0.2.7</version> </dependency> <dependency> <groupId>com.alibaba.boot</groupId> <artifactId>nacos-discovery-spring-boot-starter</artifactId> <version>0.2.7</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies>
application.properties
# 應用名稱 spring.application.name=springboot-nacos management.endpoints.jmx.exposure.include=* management.endpoints.web.exposure.include=* management.endpoint.health.show-details=always alibaba.cloud.access-key=**** alibaba.cloud.secret-key=**** nacos.discovery.server-addr=localhost:8848
服務注冊和查詢的代碼
@RestController public class TestController { @NacosValue ( value = "${info:默認值}",autoRefreshed = true) private String info; @NacosInjected private NamingService namingService; //服務注冊功能的Service //查詢注冊信息 @GetMapping("/get") public String get() throws NacosException { // return info; return namingService.getAllInstances ( "SpringBoot-Nacos" ).toString (); } //注冊的代碼 @PostMapping("/registry") public String registry() throws NacosException { //通過Instance注冊服務 Instance instance=new Instance (); instance.setClusterName("TestCluster"); //集群名稱 instance.setEnabled(true); //是否啟用 instance.setEphemeral(true); //臨時節點/持久化節點, CP(Raft), AP(Distro) instance.setIp("localhost"); instance.setPort(8848); instance.setWeight(10); //1~100 namingService.registerInstance("SpringBoot-Nacos",instance); return "SUCCESS"; } }
nacos的實現原理
Nacos的源碼分析之服務注冊的流程
Dubbo服務注冊的流程有兩個,一個是和之前分析Eureka源碼時的路徑一樣(參考Eureka源碼分析)另一個是基於Dubbo本身提供的自動裝配機制,而在基於Dubbo服務發布的過程中,是走的事件監聽機制,在DubboServiceRegistrationNonWebApplicationAutoConfiguration這個類中,這個類會監聽ApplicationStartedEvent事件,這個時間是spring boot在2.0新增的,就是當spring boot應用啟動完成之后會發布這個時間。而此時監聽到這個事件之后,會觸發注冊的動作。
//springboot啟動完成時會發布@EventListener({ApplicationStartedEvent.class})
@EventListener({ApplicationStartedEvent.class}) public void onApplicationStarted() { this.setServerPort(); this.register(); } private void register() { if (!this.registered) { this.serviceRegistry.register(this.registration); this.registered = true; } }
this.serviceRegistry。 是一個注入的實例: NacosServiceRegistry
NacosServiceRegistry.register
serviceId , 對應當前應用的application.name
group,表示nacos上的分組配置
instance,表示服務實例信息
public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); } else { String serviceId = registration.getServiceId(); String group = this.nacosDiscoveryProperties.getGroup(); Instance instance = this.getNacosInstanceFromRegistration(registration); try { this.namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", new Object[]{group, serviceId, instance.getIp(), instance.getPort()}); } catch (Exception var6) { log.error("nacos registry, {} register failed...{},", new Object[]{serviceId, registration.toString(), var6}); ReflectionUtils.rethrowRuntimeException(var6); } } }
NacosNamingService.registerInstance
開始注冊實例,主要做兩個動作
- 如果當前注冊的是臨時節點,則構建心跳信息,通過beat反應堆來構建心跳任務
- 調用registerService發起服務注冊
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { if (instance.isEphemeral()) { //是否是臨時節點,如果是臨時節點,則構建心跳信息 BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); beatInfo.setPeriod(instance.getInstanceHeartBeatInterval());
//beatReactor,添加心跳信息進行處理 this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
registerService
- 代碼邏輯很簡單,構建請求參數
- 發起請求
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance}); Map<String, String> params = new HashMap(9); params.put("namespaceId", this.namespaceId); params.put("serviceName", serviceName); params.put("groupName", groupName); params.put("clusterName", instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, "POST"); }
reqAPI
發起服務注冊請求。
- api: nacos server open api
- params: 請求參數
- body:
- method: 請求方法類型
- servers: nacos server地址
public String reqAPI(String api, Map<String, String> params, String body, List<String> servers, String method) throws NacosException { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) { throw new NacosException(NacosException.INVALID_PARAM, "no server available"); } NacosException exception = new NacosException(); //如果服務地址不為空 if (servers != null && !servers.isEmpty()) { //隨機獲取一台服務器節點 Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); //遍歷服務列表 for (int i = 0; i < servers.size(); i++) { String server = servers.get(index); //獲取索引位置的服務節點 try {
//調用指定服務 return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", server, e); } }
//輪詢 index = (index + 1) % servers.size(); } } if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("request {} failed.", nacosDomain, e); } } } } NAMING_LOGGER.error("request: {} failed, servers: {}, code: {}, msg: {}", api, servers, exception.getErrCode(), exception.getErrMsg()); throw new NacosException(exception.getErrCode(), "failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + exception.getMessage()); }
callServer
發起服務調用
public String callServer(String api, Map<String, String> params, String body, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0;
//添加簽名信息 injectSecurityInfo(params);
//添加頭信息 List<String> headers = builderHeaders(); String url; //拼接url地址 if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) { url = curServer + api; } else { if (!curServer.contains(UtilAndComs.SERVER_ADDR_IP_SPLITER)) { curServer = curServer + UtilAndComs.SERVER_ADDR_IP_SPLITER + serverPort; } url = HttpClient.getPrefix() + curServer + api; } //通過httpclient發起請求 HttpClient.HttpResult result = HttpClient.request(url, headers, params, body, UtilAndComs.ENCODING, method); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(result.code)) .observe(end - start); if (HttpURLConnection.HTTP_OK == result.code) { //返回服務端的結果 return result.content; } if (HttpURLConnection.HTTP_NOT_MODIFIED == result.code) { return StringUtils.EMPTY; } throw new NacosException(result.code, result.content); }
request
發起請求,獲得結果
public static HttpResult request(String url, List<String> headers, Map<String, String> paramValues, String body, String encoding, String method) { HttpURLConnection conn = null; try { String encodedContent = encodingParams(paramValues, encoding); url += (StringUtils.isEmpty(encodedContent)) ? "" : ("?" + encodedContent); conn = (HttpURLConnection) new URL(url).openConnection(); setHeaders(conn, headers, encoding); conn.setConnectTimeout(CON_TIME_OUT_MILLIS); conn.setReadTimeout(TIME_OUT_MILLIS); conn.setRequestMethod(method); conn.setDoOutput(true); if (StringUtils.isNotBlank(body)) { byte[] b = body.getBytes(); conn.setRequestProperty("Content-Length", String.valueOf(b.length)); conn.getOutputStream().write(b, 0, b.length); conn.getOutputStream().flush(); conn.getOutputStream().close(); } conn.connect(); if (NAMING_LOGGER.isDebugEnabled()) { NAMING_LOGGER.debug("Request from server: " + url); } return getResult(conn); } catch (Exception e) { try { if (conn != null) { NAMING_LOGGER.warn("failed to request " + conn.getURL() + " from " + InetAddress.getByName(conn.getURL().getHost()).getHostAddress()); } } catch (Exception e1) { NAMING_LOGGER.error("[NA] failed to request ", e1); //ignore } NAMING_LOGGER.error("[NA] failed to request ", e); return new HttpResult(500, e.toString(), Collections.<String, String>emptyMap()); } finally { IoUtils.closeQuietly(conn); } }
Nacos服務端的處理
服務端提供了一個InstanceController類,在這個類中提供了服務注冊相關的API,而服務端發起初測時,調用的接口是: [post]: /nacos/v1/ns/instance
- serviceName: 代表客戶端的項目名稱
- namespace: nacos 的namespace
@RestController @RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance") public class InstanceController { @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); //從請求中解析出instance 實例 final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; } }
ServiceManager.registerInstance
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //創建一個空服務,在Nacos控制台服務列表展示的服務信息,實際上是初始化一個serviceMap,它 是一個ConcurrentHashMap集合 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //從serviceMap中,根據namespaceId和serviceName得到一個服務對象 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } //調用addInstance創建一個服務實例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
createServiceIfAbsent
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { //從serviceMap中獲取服務對象 Service service = getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) {//如果為空。則初始化 cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } }
putServiceAndInit
private void putServiceAndInit(Service service) throws NacosException { putService(service);//把服務信息保存到serviceMap集合 service = getService(service.getNamespaceId(), service.getName()); service.init();//建立心跳檢測機制 //實現數據一致性監聽,ephemeral=true表示采用raft協議,false表示采用Distro consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
addInstance
向上返回,然后進入addInstance方法,把服務實例添加到集合中,然后基於一致性協議進行數據的同步。
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); consistencyService.put(key, instances); } }
消費者的服務查詢
服務注冊成功之后,消費者就可以從nacos server中獲取到服務提供者的地址,然后進行服務的調用。在服務消費中,有一個核心的類 NacosDiscoveryClient 來負責和nacos交互,去獲得服務提供者的地址信息。前置的具體的流程就不在這里復述了,之前在講dubbo源碼的時候已經分析過服務的訂閱過程。NacosDiscoveryClient 中提供了一個 getInstances 方法用來根據服務提供者名稱獲取服務提供者的url地址的方法.
客戶端啟動獲取服務列表
NacosDiscoveryClient.getInstances
public class NacosDiscoveryClient implements DiscoveryClient { private static final Logger log = LoggerFactory.getLogger(NacosDiscoveryClient.class); public static final String DESCRIPTION = "Spring Cloud Nacos Discovery Client"; private NacosServiceDiscovery serviceDiscovery; public NacosDiscoveryClient(NacosServiceDiscovery nacosServiceDiscovery) { this.serviceDiscovery = nacosServiceDiscovery; } public String description() { return "Spring Cloud Nacos Discovery Client"; } public List<ServiceInstance> getInstances(String serviceId) { try { return this.serviceDiscovery.getInstances(serviceId); } catch (Exception var3) { throw new RuntimeException("Can not get hosts from nacos server. serviceId: " + serviceId, var3); } } }
調用NamingService,根據serviceId、group獲得服務實例列表。然后把instance轉化為ServiceInstance對象
public List<ServiceInstance> getInstances(String serviceId) throws NacosException { String group = this.discoveryProperties.getGroup(); List<Instance> instances = this.discoveryProperties.namingServiceInstance().selectInstances(serviceId, group, true); return hostToServiceInstanceList(instances, serviceId); }
NacosNamingService.selectInstances
selectInstances首先從hostReactor獲取serviceInfo,然后再從serviceInfo.getHosts()剔除非healty、非enabled、weight小於等於0的instance再返回;如果subscribe為true,則執行hostReactor.getServiceInfo獲取serviceInfo,否則執行hostReactor.getServiceInfoDirectlyFromServer獲取serviceInfo
@Override public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; if (subscribe) {//是否訂閱服務地址的變化,默認為true serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } return selectInstances(serviceInfo, healthy); }
HostReactor.getServiceInfo
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); //拼接服務名稱+集群名稱(默認為空) String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } //從ServiceInfoMap中根據key來查找服務提供者列表,ServiceInfoMap是客戶端的服務地址的本 地緩存 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); if (null == serviceObj) {//如果為空,表示本地緩存不存在 //如果找不到則創建一個新的然后放入serviceInfoMap,同時放入updatingMap,執行 updateServiceNow,再從updatingMap移除; serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { //如果從serviceInfoMap找出來的serviceObj在updatingMap中則等待UPDATE_HOLD_INTERVAL; if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER.error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } //如果本地緩存中存在,則通過scheduleUpdateIfAbsent開啟定時任務,再從serviceInfoMap取出 serviceInfo scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); }
上述代碼中,有兩個邏輯,分別是
- updateServiceNow, 立馬從Nacos server中去加載服務地址信息
- scheduleUpdateIfAbsent 開啟定時調度,每1s去查詢一次服務地址
updateServiceNow
public void updateServiceNow(String serviceName, String clusters) { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUDPPort(), false); if (StringUtils.isNotEmpty(result)) { processServiceJSON(result); } } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } }
queryList
調用 /nacos/v1/ns/instance/list ,從Nacos server端獲取服務地址信息。
public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); return reqAPI(UtilAndComs.NACOS_URL_BASE + "/instance/list", params, HttpMethod.GET); }
springbppt集成nacos:
https://gitee.com/TongHuaShuShuoWoDeJieJu/springboot-nacos.git