Nacos 服務注冊需要具備的能力:
- 服務提供者把自己的協議地址注冊到Nacos server
- 服務消費者需要從Nacos Server上去查詢服務提供者的地址(根據服務名稱)
- Nacos Server需要感知到服務提供者的上下線的變化
- 服務消費者需要動態感知到Nacos Server端服務地址的變化
作為注冊中心所需要的能力大多如此,我們需要做的是理解各種注冊中心的獨有特性,總結他們的共性。
Nacos的實現原理:
下面我們先來了解一下 Nacos 注冊中心的實現原理,通過下面這張圖來說明。
圖中的流程是大家所熟悉的,不同的是在Nacos 中,服務注冊時在服務端本地會通過輪詢注冊中心集群節點地址進行服務得注冊,在注冊中心上,即Nacos Server上采用了Map保存實例信息,當然配置了持久化的服務會被保存到數據庫中,在服務的調用方,為了保證本地服務實例列表的動態感知,Nacos與其他注冊中心不同的是,采用了 Pull/Push同時運作的方式。通過這些我們對Nacos注冊中心的原理有了一定的了解。我們從源碼層面去驗證這些理論知識。
Nacos的源碼分析(結合spring-cloud-alibaba +dubbo +nacos 的整合):
服務注冊的流程:
在基於Dubbo服務發布的過程中, 自動裝配是走的事件監聽機制,在 DubboServiceRegistrationNonWebApplicationAutoConfiguration 這個類中,這個類會監聽 ApplicationStartedEvent 事件,這個事件是spring boot在2.0新增的,就是當spring boot應用啟動完成之后會發布這個時間。而此時監聽到這個事件之后,會觸發注冊的動作。
@EventListener(ApplicationStartedEvent.class) public void onApplicationStarted() { setServerPort(); register(); } private void register() { if (registered) { return; } serviceRegistry.register(registration); registered = true; }
this.serviceRegistry。 是spring-cloud提供的接口實現(org.springframework.cloud.client.serviceregistry.ServiceRegistry).很顯然注入的實例是: NacosServiceRegistry
然后進入到實現類的注冊方法:
@Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { log.warn("No service to register for nacos client..."); return; } //對應當前應用的application.name
String serviceId = registration.getServiceId(); //表示nacos上的分組配置
String group = nacosDiscoveryProperties.getGroup(); //表示服務實例信息
Instance instance = getNacosInstanceFromRegistration(registration); try { //通過命名服務進行注冊
namingService.registerInstance(serviceId, group, instance); log.info("nacos registry, {} {} {}:{} register finished", group, serviceId, instance.getIp(), instance.getPort()); } catch (Exception e) { log.error("nacos registry, {} register failed...{},", serviceId, registration.toString(), e); // rethrow a RuntimeException if the registration is failed. // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
rethrowRuntimeException(e); } }
接下去就是開始注冊實例,主要做兩個動作
- 如果當前注冊的是臨時節點,則構建心跳信息,通過beat反應堆來構建心跳任務
- 調用registerService發起服務注冊
@Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { ////是否是臨時節點,如果是臨時節點,則構建心跳信息
if (instance.isEphemeral()) { BeatInfo beatInfo = new BeatInfo(); beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName)); beatInfo.setIp(instance.getIp()); beatInfo.setPort(instance.getPort()); beatInfo.setCluster(instance.getClusterName()); beatInfo.setWeight(instance.getWeight()); beatInfo.setMetadata(instance.getMetadata()); beatInfo.setScheduled(false); //beatReactor, 添加心跳信息進行處理
beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo); } //調用服務代理類進行注冊
serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance); }
然后調用 NamingProxy 的注冊方法進行注冊,代碼邏輯很簡單,構建請求參數,發起請求。
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST); }
往下走我們就會發現上面提到的,服務在進行注冊的時候會輪詢配置好的注冊中心的地址:
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(nacosDomain)) { throw new IllegalArgumentException("no server available"); } Exception exception = new Exception(); //如果服務地址不為空
if (servers != null && !servers.isEmpty()) { //隨機獲取一台服務器節點
Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); // 遍歷服務列表
for (int i = 0; i < servers.size(); i++) { String server = servers.get(index);//獲得索引位置的服務節點
try {//調用指定服務
return callServer(api, params, server, method); } catch (NacosException e) { exception = e; NAMING_LOGGER.error("request {} failed.", server, e); } catch (Exception e) { exception = e; NAMING_LOGGER.error("request {} failed.", server, e); } //輪詢
index = (index + 1) % servers.size(); } // .......... }
最后通過 callServer(api, params, server, method) 發起調用,這里通過 JSK自帶的 HttpURLConnection 進行發起調用。我們可以通過斷點的方式來看到這里的請求參數:
期間可能會有多個 GET的請求獲取服務列表,是正常的,會發現有如上的一個請求,會調用 http://192.168.200.1:8848/nacos/v1/ns/instance 這個地址。那么接下去就是Nacos Server 接受到服務端的注冊請求的處理流程。需要下載Nacos Server 源碼,
源碼下載可以參考 :https://www.cnblogs.com/wuzhenzhao/p/11384266.html
Nacos服務端的處理:
服務端提供了一個InstanceController類,在這個類中提供了服務注冊相關的API,而服務端發起注冊時,調用的接口是: [post]: /nacos/v1/ns/instance ,serviceName: 代表客戶端的項目名稱 ,namespace: nacos 的namespace。
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 從請求中解析出instance實例 final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }
然后調用 ServiceManager 進行服務的注冊
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //創建一個空服務,在Nacos控制台服務列表展示的服務信息,實際上是初始化一個serviceMap,它是一個ConcurrentHashMap集合 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //從serviceMap中,根據namespaceId和serviceName得到一個服務對象 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } //調用addInstance創建一個服務實例 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
在創建空的服務實例的時候我們發現了存儲實例的map:
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { //從serviceMap中獲取服務對象 Service service = getService(namespaceId, serviceName); if (service == null) {//如果為空。則初始化 Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); putServiceAndInit(service); if (!local) { addOrReplaceService(service); } }
在 getService 方法中我們發現了Map:
/** * Map(namespace, Map(group::serviceName, Service)). */ private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
通過注釋我們可以知道,Nacos是通過不同的 namespace 來維護服務的,而每個namespace下有不同的group,不同的group下才有對應的Service ,再通過這個 serviceName 來確定服務實例。
第一次進來則會進入初始化,初始化完會調用 putServiceAndInit
private void putServiceAndInit(Service service) throws NacosException { putService(service);//把服務信息保存到serviceMap集合
service.init();//建立心跳檢測機制 //實現數據一致性監聽,ephemeral(標識服務是否為臨時服務,默認是持久化的,也就是true)=true表示采用raft協議,false表示采用Distro
consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
獲取到服務以后把服務實例添加到集合中,然后基於一致性協議進行數據的同步。然后調用 addInstance
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 組裝key
String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 獲取剛剛組裝的服務
Service service = getService(namespaceId, serviceName); synchronized (service) { List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); // 也就是上一步實現監聽的類里添加注冊服務
consistencyService.put(key, instances); } }
然后給服務注冊方發送注冊成功的響應。結束服務注冊流程。其中細節后續慢慢分析。