概述
注冊中心服務端的主要功能包括,接收客戶端的服務注冊,服務發現,服務下線的功能,但是除了這些和客戶端的交互之外,服務端還要做一些更重要的事情,就是我們常常會在分布式系統中聽到的AP和CP,作為一個集群,nacos即實現了AP也實現了CP,其中AP使用的自己實現的Distro協議,而CP是采用raft協議實現的,這個過程中牽涉到心跳啊,選主啊等操作,說復雜還是挺復雜的。
本文主要介紹一下注冊中心服務端接收客戶端服務注冊的功能,其他功能暫時先不涉及。
服務端接收客戶端注冊的接口如下
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); final Instance instance = parseInstance(request); serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }
該方法在com.alibaba.nacos.naming.controllers.InstanceController類中
服務注冊流程圖
根據流程圖,我把這個過程拆分成了兩塊,第一塊就是更新本地緩存,因為服務注冊並不是向集群中每個節點都注冊,而是隨機選擇其中一個節點進行注冊的。第二塊其實就是把注冊的服務信息同步給集群中別的節點,好,接下來我們詳細分析一下這兩塊。
第一塊:更新本地緩存
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //判斷本地緩存中是否存在該命名空間,如果不存在就創建,之后判斷該命名空間下是否 //存在該服務,如果不存在就創建空的服務 //注意這里並沒有更新服務的實例信息 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //從本地緩存中獲取服務信息 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } //服務注冊,這一步才會把服務的實例信息和服務綁定起來 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
進入com.alibaba.nacos.naming.core.ServiceManager#createEmptyService
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { Service service = getService(namespaceId, serviceName); if (service == null) { //如果服務不存在,創建一個空的服務 Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); //將創建的空的服務插入緩存,並初始化 putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } }
進入com.alibaba.nacos.naming.core.ServiceManager#putServiceAndInit
private void putServiceAndInit(Service service) throws NacosException { //將服務插入緩存 putService(service); //對服務啟動一個健康檢查的定時任務 service.init(); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJSON()); }
1、putService很簡單就不分析了
2.進入com.alibaba.nacos.naming.core.Service#init
這里就不深入分析了,關於健康檢查我打算專門寫一篇文章分析,因為這里阿里的大佬們使用了tcp連接,並且使用了nio來進行檢查,里面還是挺復雜的。
3.監聽的代碼,這部分涉及的東西也很多,這里也是使用的觀察者模式實現的。
總結:到這里,第一部分的代碼就算是分析完了,里面我覺得要仔細分析,還有很多的細節沒有仔細看。
第二部分:將服務信息持久化到磁盤並同步到其他節點
進入com.alibaba.nacos.naming.consistency.DelegateConsistencyServiceImpl#put()方法
@Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); }
由於我們注冊的服務是臨時的,至於為什么是臨時的,看校驗參數那一塊,具體方法為:
com.alibaba.nacos.naming.controllers.InstanceController#parseInstance(),在這個方法內部會調用getIPAddress()方法
之后進com.alibaba.nacos.naming.controllers.InstanceController#getIPAddress(),之后你就會發現有這段代碼
boolean ephemeral = BooleanUtils.toBoolean(WebUtils.optional(request, "ephemeral", String.valueOf(switchDomain.isDefaultInstanceEphemeral()))); Instance instance = new Instance(); instance.setPort(Integer.parseInt(port)); instance.setIp(ip); instance.setWeight(Double.parseDouble(weight)); instance.setClusterName(cluster); instance.setHealthy(healthy); instance.setEnabled(enabled); instance.setEphemeral(ephemeral);
這個參數其實是由你注冊服務的時候,客戶端自己設置的。
在客戶端代碼com.alibaba.nacos.client.naming.net.NamingProxy#registerService
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", namespaceId, serviceName, instance); final Map<String, String> params = new HashMap<String, String>(9); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); //設置是否是臨時服務,默認為true params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JSON.toJSONString(instance.getMetadata())); reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST); }
所以上面的put方法是com.alibaba.nacos.naming.consistency.ephemeral.distro.DistroConsistencyServiceImpl#put(),我們進入
@Override public void put(String key, Record value) throws NacosException { //將服務信息放入緩存 onPut(key, value); //新增任務,把服務信息同步給別的節點 taskDispatcher.addTask(key); }
1.onput方法比較簡單,不介紹了
2.進入taskDispatcher.addTask(key);,由於代碼太長,我就折疊了

/* * Copyright 1999-2018 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.nacos.naming.consistency.ephemeral.distro; import com.alibaba.fastjson.JSON; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.naming.misc.*; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; /** * Data sync task dispatcher * * @author nkorange * @since 1.0.0 */ @Component public class TaskDispatcher { @Autowired private GlobalConfig partitionConfig; @Autowired private DataSyncer dataSyncer; private List<TaskScheduler> taskSchedulerList = new ArrayList<>(); private final int cpuCoreCount = Runtime.getRuntime().availableProcessors(); //當這個bean加載的時候執行 @PostConstruct public void init() { //根據cpu個數,新建任務,並且執行每個任務 for (int i = 0; i < cpuCoreCount; i++) { TaskScheduler taskScheduler = new TaskScheduler(i); taskSchedulerList.add(taskScheduler); GlobalExecutor.submitTaskDispatch(taskScheduler); } } public void addTask(String key) { //從任務列表中隨機選擇一個任務處理當前的key taskSchedulerList.get(UtilsAndCommons.shakeUp(key, cpuCoreCount)).addTask(key); } //任務類 public class TaskScheduler implements Runnable { private int index; private int dataSize = 0; private long lastDispatchTime = 0L; private BlockingQueue<String> queue = new LinkedBlockingQueue<>(128 * 1024); public TaskScheduler(int index) { this.index = index; } public void addTask(String key) { queue.offer(key); } public int getIndex() { return index; } @Override public void run() { List<String> keys = new ArrayList<>(); while (true) { try { String key = queue.poll(partitionConfig.getTaskDispatchPeriod(), TimeUnit.MILLISECONDS); // String key = queue.take(); // System.out.println("test"); if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.DISTRO.debug("got key: {}", key); } if (dataSyncer.getServers() == null || dataSyncer.getServers().isEmpty()) { continue; } if (StringUtils.isBlank(key)) { continue; } if (dataSize == 0) { keys = new ArrayList<>(); } keys.add(key); dataSize++; if (dataSize == partitionConfig.getBatchSyncKeyCount() || (System.currentTimeMillis() - lastDispatchTime) > partitionConfig.getTaskDispatchPeriod()) { for (Member member : dataSyncer.getServers()) { if (NetUtils.localServer().equals(member.getAddress())) { continue; } //new一個同步任務,用於同步 SyncTask syncTask = new SyncTask(); syncTask.setKeys(keys); syncTask.setTargetServer(member.getAddress()); if (Loggers.DISTRO.isDebugEnabled() && StringUtils.isNotBlank(key)) { Loggers.DISTRO.debug("add sync task: {}", JSON.toJSONString(syncTask)); } //執行同步 dataSyncer.submit(syncTask, 0); } lastDispatchTime = System.currentTimeMillis(); dataSize = 0; } } catch (Exception e) { Loggers.DISTRO.error("dispatch sync task failed.", e); } } } } }

/* * Copyright 1999-2018 Alibaba Group Holding Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.nacos.naming.consistency.ephemeral.distro; import com.alibaba.nacos.core.cluster.Member; import com.alibaba.nacos.core.cluster.ServerMemberManager; import com.alibaba.nacos.naming.cluster.transport.Serializer; import com.alibaba.nacos.naming.consistency.Datum; import com.alibaba.nacos.naming.consistency.KeyBuilder; import com.alibaba.nacos.naming.core.DistroMapper; import com.alibaba.nacos.naming.misc.*; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.DependsOn; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * Data replicator * * @author nkorange * @since 1.0.0 */ @Component @DependsOn("ProtocolManager") public class DataSyncer { @Autowired private DataStore dataStore; @Autowired private GlobalConfig partitionConfig; @Autowired private Serializer serializer; @Autowired private DistroMapper distroMapper; @Autowired private ServerMemberManager memberManager; private Map<String, String> taskMap = new ConcurrentHashMap<>(); @PostConstruct public void init() { startTimedSync(); } public void submit(SyncTask task, long delay) { // If it's a new task: if (task.getRetryCount() == 0) { Iterator<String> iterator = task.getKeys().iterator(); while (iterator.hasNext()) { String key = iterator.next(); if (StringUtils.isNotBlank(taskMap.putIfAbsent(buildKey(key, task.getTargetServer()), key))) { // associated key already exist: if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("sync already in process, key: {}", key); } iterator.remove(); } } } if (task.getKeys().isEmpty()) { // all keys are removed: return; } GlobalExecutor.submitDataSync(() -> { // 1. check the server if (getServers() == null || getServers().isEmpty()) { Loggers.SRV_LOG.warn("try to sync data but server list is empty."); return; } List<String> keys = task.getKeys(); if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("try to sync data for this keys {}.", keys); } // 2. get the datums by keys and check the datum is empty or not Map<String, Datum> datumMap = dataStore.batchGet(keys); if (datumMap == null || datumMap.isEmpty()) { // clear all flags of this task: for (String key : keys) { taskMap.remove(buildKey(key, task.getTargetServer())); } return; } byte[] data = serializer.serialize(datumMap); long timestamp = System.currentTimeMillis(); //同步到別的節點 boolean success = NamingProxy.syncData(data, task.getTargetServer()); if (!success) { SyncTask syncTask = new SyncTask(); syncTask.setKeys(task.getKeys()); syncTask.setRetryCount(task.getRetryCount() + 1); syncTask.setLastExecuteTime(timestamp); syncTask.setTargetServer(task.getTargetServer()); retrySync(syncTask); } else { // clear all flags of this task: for (String key : task.getKeys()) { taskMap.remove(buildKey(key, task.getTargetServer())); } } }, delay); } public void retrySync(SyncTask syncTask) { Member member = new Member(); member.setIp(syncTask.getTargetServer().split(":")[0]); member.setPort(Integer.parseInt(syncTask.getTargetServer().split(":")[1])); if (!getServers().contains(member)) { // if server is no longer in healthy server list, ignore this task: //fix #1665 remove existing tasks if (syncTask.getKeys() != null) { for (String key : syncTask.getKeys()) { taskMap.remove(buildKey(key, syncTask.getTargetServer())); } } return; } // TODO may choose other retry policy. submit(syncTask, partitionConfig.getSyncRetryDelay()); } public void startTimedSync() { GlobalExecutor.schedulePartitionDataTimedSync(new TimedSync()); } public class TimedSync implements Runnable { @Override public void run() { try { if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("server list is: {}", getServers()); } // send local timestamps to other servers: Map<String, String> keyChecksums = new HashMap<>(64); for (String key : dataStore.keys()) { if (!distroMapper.responsible(KeyBuilder.getServiceName(key))) { continue; } Datum datum = dataStore.get(key); if (datum == null) { continue; } keyChecksums.put(key, datum.value.getChecksum()); } if (keyChecksums.isEmpty()) { return; } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO.debug("sync checksums: {}", keyChecksums); } for (Member member : getServers()) { if (NetUtils.localServer().equals(member.getAddress())) { continue; } NamingProxy.syncCheckSums(keyChecksums, member.getAddress()); } } catch (Exception e) { Loggers.DISTRO.error("timed sync task failed.", e); } } } public Collection<Member> getServers() { return memberManager.allMembers(); } public String buildKey(String key, String targetServer) { return key + UtilsAndCommons.CACHE_KEY_SPLITER + targetServer; } }
上面的兩個類就是處理同步的,其實同步除了這個之外,在com.alibaba.nacos.naming.consistency.ephemeral.distro.DataSyncer這個類中還有一個定時同步,這個同步是上一個同步的補充。
第二個定時同步,會把每個服務和這個服務的校驗和封裝到一個map中,然后異步發送給別的節點。
總結
里面還是有很多問題沒有搞明白,只是搞明白了流程,還有很多細節沒有仔細看,有什么問題,忘大家指教