一、Nacos服務端進行服務注冊
1.1 InstanceController#register
從這個Controller方法來看,先是解析出來instance,就是根據client發送的那堆參數解析出來的。
接着就是調用serviceManager組件進行實例注冊,這個serviceManager 組件在注冊中心是個核心組件,服務注冊,下線,獲取服務列表啥的,都是找這個組件的。
@CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { // 得到namespaceId, 默認是public final String namespaceId = WebUtils .optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 獲取serviceName final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); // 解析instance實例 final Instance instance = parseInstance(request); // 進行注冊 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; }
我們可以大概看下這個解析數來的instance:
1.2 serviceManager.registerInstance
/** * Register an instance to a service in AP mode. * * <p>This method creates service or cluster silently if they don't exist. * * @param namespaceId id of namespace * @param serviceName service name * @param instance instance to register * @throws Exception any error occurred in the process */ public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { createEmptyService(namespaceId, serviceName, instance.isEphemeral()); Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(NacosException.INVALID_PARAM, "service not found, namespace: " + namespaceId + ", service: " + serviceName); } addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
1.2.1 ServiceManager#createEmptyService
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { // 根據namespace和serviceName得到Service Service service = getService(namespaceId, serviceName); if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); // 設置group service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); // put service並初始化 putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } }
getService方法:本質就是從 private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); 中來獲取,最開始肯定是沒有,沒有就需要創建。
/** * Map(namespace, Map(group::serviceName, Service)). */ private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
public Service getService(String namespaceId, String serviceName) { if (serviceMap.get(namespaceId) == null) { return null; } return chooseServiceMap(namespaceId).get(serviceName); }
再來看下putServiceAndInit.
private void putServiceAndInit(Service service) throws NacosException { putService(service); service = getService(service.getNamespaceId(), service.getName()); service.init(); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService .listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); }
putService就是把service放到serviceMap中去,key為namespaceId.
接下來看service.init(). 這里這個初始化有個非常重要的地方就是往健康檢查器中添加一個任務,健康檢查的任務,這個任務其實就是掃描這個service里面長時間沒有心跳的instance(服務實例),然后進行健康狀態改變,服務下線.
/** * Init service. */ public void init() { // 往健康Check中添加 HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }
1.2.2 ServiceManager#registerInstance
再回到這個添加實例的方法
/** * Add instance to service. * * @param namespaceId namespace * @param serviceName service name * @param ephemeral whether instance is ephemeral * @param ips instances * @throws NacosException nacos exception */ public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { // 創建一個key String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); // 獲取Service Service service = getService(namespaceId, serviceName); synchronized (service) { // 更新,然后獲得這個服務的所有實例 List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); // 放到實例中 Instances instances = new Instances(); instances.setInstanceList(instanceList); // 調用保存--一致性服務的put方法 consistencyService.put(key, instances); } }
可以看到生成的key格式:
com.alibaba.nacos.naming.iplist.ephemeral.{namespace}##{serviceName}
接着就是獲取service, 上鎖,調用addIpAddresses 得到一個instance集合.
private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips); }
主要就是新的instance 與之前的instance進行合並啥的,生成一個新的instance集合。接着就是創建一個instances 對象,將instance集合塞進去
接下來,看這個保存方法:DelegateConsistencyServiceImpl#put
@Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); }
這個方法會根據你這個key是臨時的還是永久的選擇一個consisitencyService
private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }
這里我們是臨時的,所以就走EphemeralConsistencyService 的實現類DistroConsistencyServiceImpl 的put方法。
DistroConsistencyServiceImpl#put
@Override public void put(String key, Record value) throws NacosException { onPut(key, value); distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); }
重點關注onPut方法:
/** * Put a new record. * * @param key key of record * @param value record */ public void onPut(String key, Record value) { // 判斷節點是否臨時,主要是看key的前綴是否以這個字符串開頭”com.alibaba.nacos.naming.iplist.ephemeral“ if (KeyBuilder.matchEphemeralInstanceListKey(key)) { // 封裝對象Datum Datum<Instances> datum = new Datum<>(); datum.value = (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); // 放到dataStore進行存儲 dataStore.put(key, datum); } // 如果listener沒有這個key 直接返回 if (!listeners.containsKey(key)) { return; } // 添加通知任務 notifier.addTask(key, DataOperation.CHANGE); }
這個dataStore:
private Map<String, Datum> dataMap = new ConcurrentHashMap<>(1024); public void put(String key, Datum value) { dataMap.put(key, value); }
接下來,我們來看addTask:
/** * Add new notify task to queue. * * @param datumKey data key * @param action action for data */ public void addTask(String datumKey, DataOperation action) { // 如果已經存在且是change事件 if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { // 緩存中也放一份 services.put(datumKey, StringUtils.EMPTY); } tasks.offer(Pair.with(datumKey, action)); }
這個tasks:
private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024);
這里其實就是往任務隊列中添加了一個任務。到這按理說我們服務注冊就該結束了,但是,我們發現生成了新的instance集合並沒有更新到service對象里面去,所以還得繼續往下看,看看這個通知任務是怎么回事。
其實DistroConsistencyServiceImpl 這個類在初始化的時候,然后提交了一個任務:
@PostConstruct public void init() { GlobalExecutor.submitDistroNotifyTask(notifier); }
這個提交任務的run方法做了什么事情呢?
@Override public void run() { Loggers.DISTRO.info("distro notifier started"); for (; ; ) { try { Pair<String, DataOperation> pair = tasks.take(); handle(pair); } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } } }
private void handle(Pair<String, DataOperation> pair) { try { String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { return; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { // 數據更新 listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == DataOperation.DELETE) { // 刪除 listener.onDelete(datumKey); continue; } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] error while notifying listener of key: {}", datumKey, e); } } if (Loggers.DISTRO.isDebugEnabled()) { Loggers.DISTRO .debug("[NACOS-DISTRO] datum change notified, key: {}, listener count: {}, action: {}", datumKey, count, action.name()); } } catch (Throwable e) { Loggers.DISTRO.error("[NACOS-DISTRO] Error while handling notifying task", e); } }
這里直接通知,調用listener 的onChange活着onDelete執行相關的工作。之前已經將service作為listener注冊進來了,看下service的onChange方法:
@Override public void onChange(String key, Instances value) throws Exception { Loggers.SRV_LOG.info("[NACOS-RAFT] datum is changed, key: {}, value: {}", key, value); for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); recalculateChecksum(); }
核心在updateIps這個方法:參數1是新的instance集合,參數2是是否是臨時節點這個方法其實就是遍歷instance集合,然后更新clusterMap 這個里面的內容,這個clusterMap 其實就是clusterName 與cluster的對應關系,從代碼上可以看到實現弄出所有的cluster,然后遍歷instance集合,如果沒有某個instance沒有cluster,就設置成默認DEFAULT_CLUSTER_NAME,如果某個cluster沒有的話就創建。然后塞到一個cluster與instance集合對應關系的map中。
接着就是遍歷clusterMap更新下instance列表,主要還是比對新老的,然后找出新的instance,與掛了的instance,注意這一步是更新 cluster對象里面的集合,其實就是2個set,一個存臨時節點的,一個是存永久節點的。
/** * Update instances. * * @param instances instances * @param ephemeral whether is ephemeral instance */ public void updateIPs(Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } for (Instance instance : instances) { try { if (instance == null) { Loggers.SRV_LOG.error("[NACOS-DOM] received malformed ip: null"); continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (!clusterMap.containsKey(instance.getClusterName())) { Loggers.SRV_LOG .warn("cluster: {} not found, ip: {}, will create new cluster with default configuration.", instance.getClusterName(), instance.toJson()); Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) { Loggers.SRV_LOG.error("[NACOS-DOM] failed to process ip: " + instance, e); } } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis()); getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } Loggers.EVT_LOG.info("[IP-UPDATED] namespace: {}, service: {}, ips: {}", getNamespaceId(), getName(), stringBuilder.toString()); }
到以上,服務注冊就完成了。
二、回顧一下整個過程
本篇就到這里,歡迎圍觀評論及批評指正。下一篇繼續。