基礎配置初始化
NacosDiscoveryClientConfiguration
NacosDiscoveryProperties
初始化Nacos基礎配置信息的bean,主要指yaml中配置Nacos服務相關的信息。
NacosServiceDiscovery
初始化獲取Nacos服務和實例的bean,通過該bean可以獲取服務的信息和實例的信息。
NacosDiscoveryClientConfiguration
NacosDiscoveryClient
初始化NacosDiscoveryClient的bean,本質上就是實現NacosServiceDiscovery的實現。該類的作用就是獲取到實例信息和服務信息。
NacosWatch
從繼承結構上看,NacosWatch主要實現了SmartLifecycle和ApplicationEventPublisherAware,ApplicationEventPublisherAware就是發布事件,這里主要指的就是發布HeartbeatEvent
事件上報心跳。SmartLifecycle該接口主要是作用是所有的bean都創建完成之后,可以執行自己的初始化工作,或者在退出時執行資源銷毀工作。NacosWatch的start方法,主要是完成以下四件事情:
- 加入NamingEvent監聽;
- 獲取NamingService;
- 訂閱NamingService監聽事件;
- 發布HeartbeatEvent事件;
public void start() { //加入NamingEvent監聽 if (this.running.compareAndSet(false, true)) { //更新本地的Instance EventListener eventListener = listenerMap.computeIfAbsent(buildKey(), event -> new EventListener() { @Override public void onEvent(Event event) { if (event instanceof NamingEvent) { List<Instance> instances = ((NamingEvent) event) .getInstances(); Optional<Instance> instanceOptional = selectCurrentInstance( instances); instanceOptional.ifPresent(currentInstance -> { resetIfNeeded(currentInstance); }); } } }); //獲取NamingService NamingService namingService = nacosServiceManager .getNamingService(properties.getNacosProperties()); try { //訂閱相關NamingService的事件 namingService.subscribe(properties.getService(), properties.getGroup(), Arrays.asList(properties.getClusterName()), eventListener); } catch (Exception e) { log.error("namingService subscribe failed, properties:{}", properties, e); } //發布HeartbeatEvent事件 this.watchFuture = this.taskScheduler.scheduleWithFixedDelay( this::nacosServicesWatch, this.properties.getWatchDelay()); } }
NacosWatch的stop方法主要是完成兩件事情:
- 釋放掉監聽的線程池資源;
- 取消NamingService相關的監聽事件;
@Override public void stop() { if (this.running.compareAndSet(true, false)) { //關閉和釋放watch的線程池 if (this.watchFuture != null) { // shutdown current user-thread, // then the other daemon-threads will terminate automatic. ((ThreadPoolTaskScheduler) this.taskScheduler).shutdown(); this.watchFuture.cancel(true); } EventListener eventListener = listenerMap.get(buildKey()); try { //取消NamingService相關的訂閱信息 NamingService namingService = nacosServiceManager .getNamingService(properties.getNacosProperties()); namingService.unsubscribe(properties.getService(), properties.getGroup(), Arrays.asList(properties.getClusterName()), eventListener); } catch (NacosException e) { log.error("namingService unsubscribe failed, properties:{}", properties, e); } } }
NacosNamingService
客戶端信息的初始化發生在發起調用的時候,是一種懶加載的方式,並沒有在初始化完成的時候就進行,這部分我們分析Ribbon源碼的時候我們具體在講解一下。我們的重點看的是NacosNamingService,從基礎配置類中的NacosServiceDiscovery的getInstances的方法調用追蹤到更下層我們會發現,與服務端交互的重點的類就是NacosNamingService,NacosNamingService在初始化的時候,主要做了以下10事件
- initNamespaceForNaming:用於初始命名空間,在Nacos中命名空間用於租戶粗粒度隔離,同時還可以進行環境的區別,如開發環境和測試環境等等;
- initSerialization:序列化初始化;
- initServerAddr:初始化服務器地址,其中涉及到的endpoint 等;
- initWebRootContext:初始化web上下文,其支持通過阿里雲EDAS進行部署;
- initCacheDir:初始化緩存目錄;
- initLogName:從配置中獲取日志文件;
- EventDispatcher:監聽事件分發,當客戶端訂閱了某個服務信息后,會以Listener的方式注冊到EventDispatcher的隊列中,當有服務變化的時候,會通知訂閱者;
- NamingProxy:服務端的代理,用於客戶端與服務端的通信;
- BeatReactor:用於維持與服務器之間的心跳通信,上報客戶端注冊到服務端的服務信息;
- HostReactor:用於客戶端服務的訂閱,以及從服務端更新服務信息;

initNamespaceForNaming
//初始化獲取Namespace public static String initNamespaceForNaming(Properties properties) { String tmpNamespace = null; //是否使用阿里雲上環境進行解析,默認為true,如果沒有進行配置, //默認使用DEFAULT_USE_CLOUD_NAMESPACE_PARSING String isUseCloudNamespaceParsing = properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING, System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING, String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING))); if (Boolean.parseBoolean(isUseCloudNamespaceParsing)) { tmpNamespace = TenantUtil.getUserTenantForAns(); //從系統變量獲取namespace tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call() { String namespace = System.getProperty(SystemPropertyKeyConst.ANS_NAMESPACE); LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace); return namespace; } }); tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call() { String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE); LogUtils.NAMING_LOGGER.info("initializer namespace from System Environment :" + namespace); return namespace; } }); } //如果不是上雲環境,那么從系統變量獲取namespace tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call() { String namespace = System.getProperty(PropertyKeyConst.NAMESPACE); LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace); return namespace; } }); //從properties中獲取namespace if (StringUtils.isEmpty(tmpNamespace) && properties != null) { tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE); } //獲取系統默認的namespace tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() { @Override public String call() { return UtilAndComs.DEFAULT_NAMESPACE_ID; } }); return tmpNamespace; }
initServerAddr
//初始化服務器地址 private void initServerAddr(Properties properties) { //從properties中獲取服務器地址 serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR); //初始化endpoint,如果有endpoint,則廢棄serverList endpoint = InitUtils.initEndpoint(properties); if (StringUtils.isNotEmpty(endpoint)) { serverList = ""; } } public static String initEndpoint(final Properties properties) { if (properties == null) { return ""; } // Whether to enable domain name resolution rules //是否使用endpoint解析,默認為true,也就是:USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE String isUseEndpointRuleParsing = properties.getProperty(PropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE, System.getProperty(SystemPropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE, String.valueOf(ParamUtil.USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE))); boolean isUseEndpointParsingRule = Boolean.valueOf(isUseEndpointRuleParsing); String endpointUrl; //使用endpoint解析功能 if (isUseEndpointParsingRule) { // Get the set domain name information endpointUrl = ParamUtil.parsingEndpointRule(properties.getProperty(PropertyKeyConst.ENDPOINT)); if (StringUtils.isBlank(endpointUrl)) { return ""; } } else { //不使用的化,直接通過properties文件來獲取 endpointUrl = properties.getProperty(PropertyKeyConst.ENDPOINT); } if (StringUtils.isBlank(endpointUrl)) { return ""; } //獲取endpoint的端口 String endpointPort = TemplateUtils.stringEmptyAndThenExecute(System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_ENDPOINT_PORT), new Callable<String>() { @Override public String call() { return properties.getProperty(PropertyKeyConst.ENDPOINT_PORT); } }); endpointPort = TemplateUtils.stringEmptyAndThenExecute(endpointPort, new Callable<String>() { @Override public String call() { return "8080"; } }); return endpointUrl + ":" + endpointPort; }
initWebRootContext
//阿里雲EDAS相關的 public static void initWebRootContext() { // support the web context with ali-yun if the app deploy by EDAS final String webContext = System.getProperty(SystemPropertyKeyConst.NAMING_WEB_CONTEXT); TemplateUtils.stringNotEmptyAndThenExecute(webContext, new Runnable() { @Override public void run() { UtilAndComs.webContext = webContext.indexOf("/") > -1 ? webContext : "/" + webContext; UtilAndComs.nacosUrlBase = UtilAndComs.webContext + "/v1/ns"; UtilAndComs.nacosUrlInstance = UtilAndComs.nacosUrlBase + "/instance"; } }); }
initCacheDir
//初始化緩存目錄,用於存放從服務端獲取的服務信息,如果客戶端與服務端斷開了連接,將會使用緩存的信息 private void initCacheDir() { cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir"); if (StringUtils.isEmpty(cacheDir)) { cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace; } }
initLogName
//初始化日志存放路徑 private void initLogName(Properties properties) { logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME); if (StringUtils.isEmpty(logName)) { if (properties != null && StringUtils .isNotEmpty(properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME))) { logName = properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME); } else { logName = "naming.log"; } } }
EventDispatcher
EventDispatcher維護一個阻塞隊列,主要存儲發生改變的服務的信息,維護了一個對於服務的監聽隊列的映射的Map,實時的將服務變化信息同步給監聽者,這樣客戶端就可以通過注冊監聽者實現在服務變化后動態進行操作。
public class EventDispatcher implements Closeable { private ExecutorService executor = null; //發生了變化的服務隊列 private final BlockingQueue<ServiceInfo> changedServices = new LinkedBlockingQueue<ServiceInfo>(); //監聽者維護映射 private final ConcurrentMap<String, List<EventListener>> observerMap = new ConcurrentHashMap<String, List<EventListener>>(); private volatile boolean closed = false; public EventDispatcher() { this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener"); thread.setDaemon(true); return thread; } }); this.executor.execute(new Notifier()); } /** * Add listener. * * @param serviceInfo service info * @param clusters clusters * @param listener listener */ public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map"); List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>()); observers.add(listener); observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers); if (observers != null) { observers.add(listener); } serviceChanged(serviceInfo); } /** * Remove listener. * * @param serviceName service name * @param clusters clusters * @param listener listener */ public void removeListener(String serviceName, String clusters, EventListener listener) { NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map"); List<EventListener> observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters)); if (observers != null) { Iterator<EventListener> iter = observers.iterator(); while (iter.hasNext()) { EventListener oldListener = iter.next(); if (oldListener.equals(listener)) { iter.remove(); } } if (observers.isEmpty()) { observerMap.remove(ServiceInfo.getKey(serviceName, clusters)); } } } public boolean isSubscribed(String serviceName, String clusters) { return observerMap.containsKey(ServiceInfo.getKey(serviceName, clusters)); } public List<ServiceInfo> getSubscribeServices() { List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>(); for (String key : observerMap.keySet()) { serviceInfos.add(ServiceInfo.fromKey(key)); } return serviceInfos; } /** * Service changed. * * @param serviceInfo service info */ public void serviceChanged(ServiceInfo serviceInfo) { if (serviceInfo == null) { return; } changedServices.add(serviceInfo); } @Override public void shutdown() throws NacosException { String className = this.getClass().getName(); NAMING_LOGGER.info("{} do shutdown begin", className); ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER); closed = true; NAMING_LOGGER.info("{} do shutdown stop", className); } //服務變化通知線程 private class Notifier implements Runnable { @Override public void run() { while (!closed) { ServiceInfo serviceInfo = null; try { //從隊列取出變化消息 serviceInfo = changedServices.poll(5, TimeUnit.MINUTES); } catch (Exception ignore) { } if (serviceInfo == null) { continue; } try { //獲取監聽者隊列 List<EventListener> listeners = observerMap.get(serviceInfo.getKey()); //遍歷監聽者隊列,調用其onEvent方法 if (!CollectionUtils.isEmpty(listeners)) { for (EventListener listener : listeners) { List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts()); listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts)); } } } catch (Exception e) { NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: " + serviceInfo.getClusters(), e); } } } } }
NamingProxy
NamingProxy封裝了與服務端的操作,代碼相對比較簡單,值得注意的是如果配置安全相關的內容,在初始化的時候該處會進行一個定時任務的查詢,如果對安全要求比較高,可重SecurityProxy部分內容,當然服務端部分也需要重寫,大部分的情況這注冊中心一般暴露在內網環境下,基本上不需要重寫的。
BeatReactor
BeatReactor負責將客戶端的信息上報和下線,對於非持久化的內容采用周期上報內容,這部分在服務心跳的時候我們講解過,這里不進行源碼分析,大家重點關注addBeatInfo、removeBeatInfo和BeatTask的內容,相對比較簡單。
HostReactor
HostReactor主要負責客戶端獲取服務端注冊的信息的部分,主要分為三個部分:
- 客戶端需要調用NacosNamingService獲取服務信息方法的時候,HostReactor負責把服務信息維護本地緩存的serviceInfoMap中,並且通過UpdateTask定時更新已存在的服務;
- HostReactor內部維護PushReceiver對象,負責接收服務端通過UDP協議推送過來的服務變更的信息,並更新到本地緩存serviceInfoMap當中;
- HostReactor內部維護FailoverReactor對象,負責當服務端不可用的時候,切換到本地文件緩存模式,從本地文件的緩存中獲取服務信息;
public class HostReactor implements Closeable { private static final long DEFAULT_DELAY = 1000L; private static final long UPDATE_HOLD_INTERVAL = 5000L; private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>(); private final Map<String, ServiceInfo> serviceInfoMap; private final Map<String, Object> updatingMap; //接收UDP服務端UDP協議 private final PushReceiver pushReceiver; //阻塞隊列的變更消息處理 private final EventDispatcher eventDispatcher; //心跳上報 private final BeatReactor beatReactor; //HTTP請求消息的處理 private final NamingProxy serverProxy; //服務不可用時本地降級文件的處理模式 private final FailoverReactor failoverReactor; private final String cacheDir; //定時更新服務消息的定時任務 private final ScheduledExecutorService executor; public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir) { this(eventDispatcher, serverProxy, beatReactor, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT); } public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor, String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) { // init executorService this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.client.naming.updater"); return thread; } }); this.eventDispatcher = eventDispatcher; this.beatReactor = beatReactor; this.serverProxy = serverProxy; this.cacheDir = cacheDir; if (loadCacheAtStart) { this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir)); } else { this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16); } this.updatingMap = new ConcurrentHashMap<String, Object>(); this.failoverReactor = new FailoverReactor(this, cacheDir); this.pushReceiver = new PushReceiver(this); } public Map<String, ServiceInfo> getServiceInfoMap() { return serviceInfoMap; } public synchronized ScheduledFuture<?> addTask(UpdateTask task) { return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); } /** * Process service json. * * @param json service json * @return service info */ //處理從服務端接收到的數據 public ServiceInfo processServiceJson(String json) { ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class); ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey()); if (serviceInfo.getHosts() == null || !serviceInfo.validate()) { //empty or error push, just ignore return oldService; } boolean changed = false; //新老信息對比處理 if (oldService != null) { //如果本地舊服務的獲取時間比服務器端獲取的時間新,則保留本地舊服務的時間 if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) { NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: " + serviceInfo.getLastRefTime()); } //用新服務信息替換serviceInfoMap serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size()); for (Instance host : oldService.getHosts()) { oldHostMap.put(host.toInetAddr(), host); } Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size()); for (Instance host : serviceInfo.getHosts()) { newHostMap.put(host.toInetAddr(), host); } Set<Instance> modHosts = new HashSet<Instance>(); Set<Instance> newHosts = new HashSet<Instance>(); Set<Instance> remvHosts = new HashSet<Instance>(); List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>( newHostMap.entrySet()); for (Map.Entry<String, Instance> entry : newServiceHosts) { Instance host = entry.getValue(); String key = entry.getKey(); if (oldHostMap.containsKey(key) && !StringUtils .equals(host.toString(), oldHostMap.get(key).toString())) { modHosts.add(host); continue; } if (!oldHostMap.containsKey(key)) { newHosts.add(host); } } for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) { Instance host = entry.getValue(); String key = entry.getKey(); if (newHostMap.containsKey(key)) { continue; } if (!newHostMap.containsKey(key)) { remvHosts.add(host); } } if (newHosts.size() > 0) { changed = true; NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(newHosts)); } if (remvHosts.size() > 0) { changed = true; NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(remvHosts)); } if (modHosts.size() > 0) { changed = true; updateBeatInfo(modHosts); NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(modHosts)); } serviceInfo.setJsonFromServer(json); if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) { //服務信息變更寫入阻塞隊列 eventDispatcher.serviceChanged(serviceInfo); //磁盤緩存希爾 DiskCache.write(serviceInfo, cacheDir); } } else { //新服務的信息直接加入本地緩存 changed = true; NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); serviceInfoMap.put(serviceInfo.getKey(), serviceInfo); eventDispatcher.serviceChanged(serviceInfo); serviceInfo.setJsonFromServer(json); DiskCache.write(serviceInfo, cacheDir); } //上報數量 MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size()); if (changed) { NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> " + JacksonUtils.toJson(serviceInfo.getHosts())); } return serviceInfo; } private void updateBeatInfo(Set<Instance> modHosts) { for (Instance instance : modHosts) { String key = beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort()); if (beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(instance); beatReactor.addBeatInfo(instance.getServiceName(), beatInfo); } } } //通過key獲取服務對象 private ServiceInfo getServiceInfo0(String serviceName, String clusters) { //得到ServiceInfo的key String key = ServiceInfo.getKey(serviceName, clusters); //從本地緩存中獲取服務信息 return serviceInfoMap.get(key); } //從服務器端獲取Service信息,並解析為ServiceInfo對象 public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters) throws NacosException { String result = serverProxy.queryList(serviceName, clusters, 0, false); if (StringUtils.isNotEmpty(result)) { return JacksonUtils.toObj(result, ServiceInfo.class); } return null; } public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); //是否開啟本地文件緩存模式 if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); //從serviceInfoMap獲取serviceObj,如果沒有serviceObj,則新生成一個 if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } else if (updatingMap.containsKey(serviceName)) { //如果更新列表中包含服務,則等待更新結束 if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) { NAMING_LOGGER .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e); } } } } //添加更新調度任務 scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); } //從服務端更新服務 private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } } /** * Schedule update if absent. * * @param serviceName service name * @param clusters clusters */ //添加更新調度任務 public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } /** * Update service now. * * @param serviceName service name * @param clusters clusters */ public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } /** * Refresh only. * * @param serviceName service name * @param clusters cluster */ public void refreshOnly(String serviceName, String clusters) { try { serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); } catch (Exception e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } } @Override public void shutdown() throws NacosException { String className = this.getClass().getName(); NAMING_LOGGER.info("{} do shutdown begin", className); ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER); pushReceiver.shutdown(); failoverReactor.shutdown(); NAMING_LOGGER.info("{} do shutdown stop", className); } //定時更新已存在的服務 public class UpdateTask implements Runnable { long lastRefTime = Long.MAX_VALUE; private final String clusters; private final String serviceName; /** * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty */ private int failCount = 0; public UpdateTask(String serviceName, String clusters) { this.serviceName = serviceName; this.clusters = clusters; } private void incFailCount() { int limit = 6; if (failCount == limit) { return; } failCount++; } private void resetFailCount() { failCount = 0; } @Override public void run() { long delayTime = DEFAULT_DELAY; try { ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); if (serviceObj == null) { updateService(serviceName, clusters); return; } if (serviceObj.getLastRefTime() <= lastRefTime) { updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // if serviceName already updated by push, we should not override it // since the push data may be different from pull through force push refreshOnly(serviceName, clusters); } lastRefTime = serviceObj.getLastRefTime(); if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap .containsKey(ServiceInfo.getKey(serviceName, clusters))) { // abort the update task NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters); return; } if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } delayTime = serviceObj.getCacheMillis(); resetFailCount(); } catch (Throwable e) { incFailCount(); NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e); } finally { executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } } } }
總結
從NacosNamingService初始化的整個過程中,很重要的一點值得我們學習,就是單一職責這個理念在每個類的設計上表現的淋淋盡致。
結束
歡迎大家點點關注,點點贊,上海地區幫助團隊招兩個Java,其中兩點比較重要:靠譜和全日制本科,正常的業務開發,整體技術棧Dubbo+GateWay網關,有意者私聊!
