Nacos服務發現


基礎配置初始化

NacosDiscoveryClientConfiguration
NacosDiscoveryProperties

初始化Nacos基礎配置信息的bean,主要指yaml中配置Nacos服務相關的信息。

NacosServiceDiscovery

初始化獲取Nacos服務和實例的bean,通過該bean可以獲取服務的信息和實例的信息。

NacosDiscoveryClientConfiguration

NacosDiscoveryClient

初始化NacosDiscoveryClient的bean,本質上就是實現NacosServiceDiscovery的實現。該類的作用就是獲取到實例信息和服務信息。

NacosWatch

從繼承結構上看,NacosWatch主要實現了SmartLifecycle和ApplicationEventPublisherAware,ApplicationEventPublisherAware就是發布事件,這里主要指的就是發布HeartbeatEvent
事件上報心跳。SmartLifecycle該接口主要是作用是所有的bean都創建完成之后,可以執行自己的初始化工作,或者在退出時執行資源銷毀工作。NacosWatch的start方法,主要是完成以下四件事情:
  1. 加入NamingEvent監聽;
  2. 獲取NamingService;
  3. 訂閱NamingService監聽事件;
  4. 發布HeartbeatEvent事件;
 public void start() {
        //加入NamingEvent監聽
        if (this.running.compareAndSet(false, true)) {
            //更新本地的Instance
            EventListener eventListener = listenerMap.computeIfAbsent(buildKey(),
                    event -> new EventListener() {
                        @Override
                        public void onEvent(Event event) {
                            if (event instanceof NamingEvent) {
                                List<Instance> instances = ((NamingEvent) event)
                                        .getInstances();
                                Optional<Instance> instanceOptional = selectCurrentInstance(
                                        instances);
                                instanceOptional.ifPresent(currentInstance -> {
                                    resetIfNeeded(currentInstance);
                                });
                            }
                        }
                    });
            //獲取NamingService
            NamingService namingService = nacosServiceManager
                    .getNamingService(properties.getNacosProperties());
            try {
                //訂閱相關NamingService的事件
                namingService.subscribe(properties.getService(), properties.getGroup(),
                        Arrays.asList(properties.getClusterName()), eventListener);
            }
            catch (Exception e) {
                log.error("namingService subscribe failed, properties:{}", properties, e);
            }
            //發布HeartbeatEvent事件
            this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(
                    this::nacosServicesWatch, this.properties.getWatchDelay());
        }
    }
View Code

NacosWatch的stop方法主要是完成兩件事情:

  1. 釋放掉監聽的線程池資源;
  2. 取消NamingService相關的監聽事件;
    @Override
    public void stop() {
        if (this.running.compareAndSet(true, false)) {
            //關閉和釋放watch的線程池
            if (this.watchFuture != null) {
                // shutdown current user-thread,
                // then the other daemon-threads will terminate automatic.
                ((ThreadPoolTaskScheduler) this.taskScheduler).shutdown();
                this.watchFuture.cancel(true);
            }

            EventListener eventListener = listenerMap.get(buildKey());
            try {
                //取消NamingService相關的訂閱信息
                NamingService namingService = nacosServiceManager
                        .getNamingService(properties.getNacosProperties());
                namingService.unsubscribe(properties.getService(), properties.getGroup(),
                        Arrays.asList(properties.getClusterName()), eventListener);
            }
            catch (NacosException e) {
                log.error("namingService unsubscribe failed, properties:{}", properties,
                        e);
            }
        }
    }
View Code

NacosNamingService

客戶端信息的初始化發生在發起調用的時候,是一種懶加載的方式,並沒有在初始化完成的時候就進行,這部分我們分析Ribbon源碼的時候我們具體在講解一下。我們的重點看的是NacosNamingService,從基礎配置類中的NacosServiceDiscovery的getInstances的方法調用追蹤到更下層我們會發現,與服務端交互的重點的類就是NacosNamingService,NacosNamingService在初始化的時候,主要做了以下10事件

  1. initNamespaceForNaming:用於初始命名空間,在Nacos中命名空間用於租戶粗粒度隔離,同時還可以進行環境的區別,如開發環境和測試環境等等;
  2. initSerialization:序列化初始化;
  3. initServerAddr:初始化服務器地址,其中涉及到的endpoint 等;
  4. initWebRootContext:初始化web上下文,其支持通過阿里雲EDAS進行部署;
  5. initCacheDir:初始化緩存目錄;
  6. initLogName:從配置中獲取日志文件;
  7. EventDispatcher:監聽事件分發,當客戶端訂閱了某個服務信息后,會以Listener的方式注冊到EventDispatcher的隊列中,當有服務變化的時候,會通知訂閱者;
  8. NamingProxy:服務端的代理,用於客戶端與服務端的通信;
  9. BeatReactor:用於維持與服務器之間的心跳通信,上報客戶端注冊到服務端的服務信息;
  10. HostReactor:用於客戶端服務的訂閱,以及從服務端更新服務信息;
initNamespaceForNaming
//初始化獲取Namespace
public static String initNamespaceForNaming(Properties properties) {
        String tmpNamespace = null;
        
        //是否使用阿里雲上環境進行解析,默認為true,如果沒有進行配置,
        //默認使用DEFAULT_USE_CLOUD_NAMESPACE_PARSING
        String isUseCloudNamespaceParsing = properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
                System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,
                        String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING)));
        
        if (Boolean.parseBoolean(isUseCloudNamespaceParsing)) {
            
            tmpNamespace = TenantUtil.getUserTenantForAns();
            //從系統變量獲取namespace
            tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
                @Override
                public String call() {
                    String namespace = System.getProperty(SystemPropertyKeyConst.ANS_NAMESPACE);
                    LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace);
                    return namespace;
                }
            });
            
            tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
                @Override
                public String call() {
                    String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE);
                    LogUtils.NAMING_LOGGER.info("initializer namespace from System Environment :" + namespace);
                    return namespace;
                }
            });
        }
        //如果不是上雲環境,那么從系統變量獲取namespace
        tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
            @Override
            public String call() {
                String namespace = System.getProperty(PropertyKeyConst.NAMESPACE);
                LogUtils.NAMING_LOGGER.info("initializer namespace from System Property :" + namespace);
                return namespace;
            }
        });
        //從properties中獲取namespace
        if (StringUtils.isEmpty(tmpNamespace) && properties != null) {
            tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE);
        }
        //獲取系統默認的namespace
        tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {
            @Override
            public String call() {
                return UtilAndComs.DEFAULT_NAMESPACE_ID;
            }
        });
        return tmpNamespace;
    }
View Code
initServerAddr
//初始化服務器地址
private void initServerAddr(Properties properties) {
    //從properties中獲取服務器地址
    serverList = properties.getProperty(PropertyKeyConst.SERVER_ADDR);
    //初始化endpoint,如果有endpoint,則廢棄serverList
    endpoint = InitUtils.initEndpoint(properties);
    if (StringUtils.isNotEmpty(endpoint)) {
        serverList = "";
    }
}
public static String initEndpoint(final Properties properties) {
    if (properties == null) {

        return "";
    }
    // Whether to enable domain name resolution rules
    //是否使用endpoint解析,默認為true,也就是:USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE
    String isUseEndpointRuleParsing =
        properties.getProperty(PropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE,
            System.getProperty(SystemPropertyKeyConst.IS_USE_ENDPOINT_PARSING_RULE,
                String.valueOf(ParamUtil.USE_ENDPOINT_PARSING_RULE_DEFAULT_VALUE)));

    boolean isUseEndpointParsingRule = Boolean.valueOf(isUseEndpointRuleParsing);
    String endpointUrl;
    //使用endpoint解析功能
    if (isUseEndpointParsingRule) {
        // Get the set domain name information
        endpointUrl = ParamUtil.parsingEndpointRule(properties.getProperty(PropertyKeyConst.ENDPOINT));
        if (StringUtils.isBlank(endpointUrl)) {
            return "";
        }
    } else {
        //不使用的化,直接通過properties文件來獲取
        endpointUrl = properties.getProperty(PropertyKeyConst.ENDPOINT);
    }

    if (StringUtils.isBlank(endpointUrl)) {
        return "";
    }

    //獲取endpoint的端口
    String endpointPort = TemplateUtils.stringEmptyAndThenExecute(System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_ENDPOINT_PORT), new Callable<String>() {
        @Override
        public String call() {

            return properties.getProperty(PropertyKeyConst.ENDPOINT_PORT);
        }
    });

    endpointPort = TemplateUtils.stringEmptyAndThenExecute(endpointPort, new Callable<String>() {
        @Override
        public String call() {
            return "8080";
        }
    });

    return endpointUrl + ":" + endpointPort;
}
View Code
initWebRootContext
//阿里雲EDAS相關的
public static void initWebRootContext() {
        // support the web context with ali-yun if the app deploy by EDAS
        final String webContext = System.getProperty(SystemPropertyKeyConst.NAMING_WEB_CONTEXT);
        TemplateUtils.stringNotEmptyAndThenExecute(webContext, new Runnable() {
            @Override
            public void run() {
                UtilAndComs.webContext = webContext.indexOf("/") > -1 ? webContext : "/" + webContext;
                
                UtilAndComs.nacosUrlBase = UtilAndComs.webContext + "/v1/ns";
                UtilAndComs.nacosUrlInstance = UtilAndComs.nacosUrlBase + "/instance";
            }
        });
    }
View Code
initCacheDir
   //初始化緩存目錄,用於存放從服務端獲取的服務信息,如果客戶端與服務端斷開了連接,將會使用緩存的信息 
   private void initCacheDir() {
        cacheDir = System.getProperty("com.alibaba.nacos.naming.cache.dir");
        if (StringUtils.isEmpty(cacheDir)) {
            cacheDir = System.getProperty("user.home") + "/nacos/naming/" + namespace;
        }
    }
View Code
initLogName
//初始化日志存放路徑    
private void initLogName(Properties properties) {
        logName = System.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);
        if (StringUtils.isEmpty(logName)) {

            if (properties != null && StringUtils
                    .isNotEmpty(properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME))) {
                logName = properties.getProperty(UtilAndComs.NACOS_NAMING_LOG_NAME);
            } else {
                logName = "naming.log";
            }
        }
  }
View Code
EventDispatcher

EventDispatcher維護一個阻塞隊列,主要存儲發生改變的服務的信息,維護了一個對於服務的監聽隊列的映射的Map,實時的將服務變化信息同步給監聽者,這樣客戶端就可以通過注冊監聽者實現在服務變化后動態進行操作。

public class EventDispatcher implements Closeable {
    
    private ExecutorService executor = null;
    
    //發生了變化的服務隊列
    private final BlockingQueue<ServiceInfo> changedServices = new LinkedBlockingQueue<ServiceInfo>();
    //監聽者維護映射
    private final ConcurrentMap<String, List<EventListener>> observerMap = new ConcurrentHashMap<String, List<EventListener>>();
    
    private volatile boolean closed = false;
    
    public EventDispatcher() {
        
        this.executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r, "com.alibaba.nacos.naming.client.listener");
                thread.setDaemon(true);
                
                return thread;
            }
        });
        
        this.executor.execute(new Notifier());
    }
    
    /**
     * Add listener.
     *
     * @param serviceInfo service info
     * @param clusters    clusters
     * @param listener    listener
     */
    public void addListener(ServiceInfo serviceInfo, String clusters, EventListener listener) {
        
        NAMING_LOGGER.info("[LISTENER] adding " + serviceInfo.getName() + " with " + clusters + " to listener map");
        List<EventListener> observers = Collections.synchronizedList(new ArrayList<EventListener>());
        observers.add(listener);
        
        observers = observerMap.putIfAbsent(ServiceInfo.getKey(serviceInfo.getName(), clusters), observers);
        if (observers != null) {
            observers.add(listener);
        }
        
        serviceChanged(serviceInfo);
    }
    
    /**
     * Remove listener.
     *
     * @param serviceName service name
     * @param clusters    clusters
     * @param listener    listener
     */
    public void removeListener(String serviceName, String clusters, EventListener listener) {
        
        NAMING_LOGGER.info("[LISTENER] removing " + serviceName + " with " + clusters + " from listener map");
        
        List<EventListener> observers = observerMap.get(ServiceInfo.getKey(serviceName, clusters));
        if (observers != null) {
            Iterator<EventListener> iter = observers.iterator();
            while (iter.hasNext()) {
                EventListener oldListener = iter.next();
                if (oldListener.equals(listener)) {
                    iter.remove();
                }
            }
            if (observers.isEmpty()) {
                observerMap.remove(ServiceInfo.getKey(serviceName, clusters));
            }
        }
    }
    
    public boolean isSubscribed(String serviceName, String clusters) {
        return observerMap.containsKey(ServiceInfo.getKey(serviceName, clusters));
    }
    
    public List<ServiceInfo> getSubscribeServices() {
        List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>();
        for (String key : observerMap.keySet()) {
            serviceInfos.add(ServiceInfo.fromKey(key));
        }
        return serviceInfos;
    }
    
    /**
     * Service changed.
     *
     * @param serviceInfo service info
     */
    public void serviceChanged(ServiceInfo serviceInfo) {
        if (serviceInfo == null) {
            return;
        }
        
        changedServices.add(serviceInfo);
    }
    
    @Override
    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        NAMING_LOGGER.info("{} do shutdown begin", className);
        ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER);
        closed = true;
        NAMING_LOGGER.info("{} do shutdown stop", className);
    }
    //服務變化通知線程
    private class Notifier implements Runnable {
        
        @Override
        public void run() {
            while (!closed) {
                
                ServiceInfo serviceInfo = null;
                try {
                    //從隊列取出變化消息
                    serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);
                } catch (Exception ignore) {
                }
                
                if (serviceInfo == null) {
                    continue;
                }
                
                try {
                    //獲取監聽者隊列
                    List<EventListener> listeners = observerMap.get(serviceInfo.getKey());
                    //遍歷監聽者隊列,調用其onEvent方法
                    if (!CollectionUtils.isEmpty(listeners)) {
                        for (EventListener listener : listeners) {
                            List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());
                            listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(),
                                    serviceInfo.getClusters(), hosts));
                        }
                    }
                    
                } catch (Exception e) {
                    NAMING_LOGGER.error("[NA] notify error for service: " + serviceInfo.getName() + ", clusters: "
                            + serviceInfo.getClusters(), e);
                }
            }
        }
    }
}
View Code
NamingProxy

NamingProxy封裝了與服務端的操作,代碼相對比較簡單,值得注意的是如果配置安全相關的內容,在初始化的時候該處會進行一個定時任務的查詢,如果對安全要求比較高,可重SecurityProxy部分內容,當然服務端部分也需要重寫,大部分的情況這注冊中心一般暴露在內網環境下,基本上不需要重寫的。

BeatReactor

BeatReactor負責將客戶端的信息上報和下線,對於非持久化的內容采用周期上報內容,這部分在服務心跳的時候我們講解過,這里不進行源碼分析,大家重點關注addBeatInfo、removeBeatInfo和BeatTask的內容,相對比較簡單。

HostReactor

HostReactor主要負責客戶端獲取服務端注冊的信息的部分,主要分為三個部分:

  1. 客戶端需要調用NacosNamingService獲取服務信息方法的時候,HostReactor負責把服務信息維護本地緩存的serviceInfoMap中,並且通過UpdateTask定時更新已存在的服務;
  2. HostReactor內部維護PushReceiver對象,負責接收服務端通過UDP協議推送過來的服務變更的信息,並更新到本地緩存serviceInfoMap當中;
  3. HostReactor內部維護FailoverReactor對象,負責當服務端不可用的時候,切換到本地文件緩存模式,從本地文件的緩存中獲取服務信息; 
public class HostReactor implements Closeable {
    
    private static final long DEFAULT_DELAY = 1000L;
    
    private static final long UPDATE_HOLD_INTERVAL = 5000L;
    
    private final Map<String, ScheduledFuture<?>> futureMap = new HashMap<String, ScheduledFuture<?>>();
    
    private final Map<String, ServiceInfo> serviceInfoMap;
    
    private final Map<String, Object> updatingMap;
    //接收UDP服務端UDP協議
    private final PushReceiver pushReceiver;
    //阻塞隊列的變更消息處理
    private final EventDispatcher eventDispatcher;
    //心跳上報
    private final BeatReactor beatReactor;
    //HTTP請求消息的處理
    private final NamingProxy serverProxy;
    //服務不可用時本地降級文件的處理模式
    private final FailoverReactor failoverReactor;
    
    private final String cacheDir;
    //定時更新服務消息的定時任務
    private final ScheduledExecutorService executor;
    
    public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,
            String cacheDir) {
        this(eventDispatcher, serverProxy, beatReactor, cacheDir, false, UtilAndComs.DEFAULT_POLLING_THREAD_COUNT);
    }
    
    public HostReactor(EventDispatcher eventDispatcher, NamingProxy serverProxy, BeatReactor beatReactor,
            String cacheDir, boolean loadCacheAtStart, int pollingThreadCount) {
        // init executorService
        this.executor = new ScheduledThreadPoolExecutor(pollingThreadCount, new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setDaemon(true);
                thread.setName("com.alibaba.nacos.client.naming.updater");
                return thread;
            }
        });
        this.eventDispatcher = eventDispatcher;
        this.beatReactor = beatReactor;
        this.serverProxy = serverProxy;
        this.cacheDir = cacheDir;
        if (loadCacheAtStart) {
            this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(DiskCache.read(this.cacheDir));
        } else {
            this.serviceInfoMap = new ConcurrentHashMap<String, ServiceInfo>(16);
        }
        
        this.updatingMap = new ConcurrentHashMap<String, Object>();
        this.failoverReactor = new FailoverReactor(this, cacheDir);
        this.pushReceiver = new PushReceiver(this);
    }
    
    public Map<String, ServiceInfo> getServiceInfoMap() {
        return serviceInfoMap;
    }
    
    public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
        return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
    }
    
    /**
     * Process service json.
     *
     * @param json service json
     * @return service info
     */
    //處理從服務端接收到的數據
    public ServiceInfo processServiceJson(String json) {
        ServiceInfo serviceInfo = JacksonUtils.toObj(json, ServiceInfo.class);
        ServiceInfo oldService = serviceInfoMap.get(serviceInfo.getKey());
        if (serviceInfo.getHosts() == null || !serviceInfo.validate()) {
            //empty or error push, just ignore
            return oldService;
        }
        
        boolean changed = false;
        //新老信息對比處理
        if (oldService != null) {
            //如果本地舊服務的獲取時間比服務器端獲取的時間新,則保留本地舊服務的時間
            if (oldService.getLastRefTime() > serviceInfo.getLastRefTime()) {
                NAMING_LOGGER.warn("out of date data received, old-t: " + oldService.getLastRefTime() + ", new-t: "
                        + serviceInfo.getLastRefTime());
            }
            //用新服務信息替換serviceInfoMap
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            
            Map<String, Instance> oldHostMap = new HashMap<String, Instance>(oldService.getHosts().size());
            for (Instance host : oldService.getHosts()) {
                oldHostMap.put(host.toInetAddr(), host);
            }
            
            Map<String, Instance> newHostMap = new HashMap<String, Instance>(serviceInfo.getHosts().size());
            for (Instance host : serviceInfo.getHosts()) {
                newHostMap.put(host.toInetAddr(), host);
            }
            
            Set<Instance> modHosts = new HashSet<Instance>();
            Set<Instance> newHosts = new HashSet<Instance>();
            Set<Instance> remvHosts = new HashSet<Instance>();
            
            List<Map.Entry<String, Instance>> newServiceHosts = new ArrayList<Map.Entry<String, Instance>>(
                    newHostMap.entrySet());
            for (Map.Entry<String, Instance> entry : newServiceHosts) {
                Instance host = entry.getValue();
                String key = entry.getKey();
                if (oldHostMap.containsKey(key) && !StringUtils
                        .equals(host.toString(), oldHostMap.get(key).toString())) {
                    modHosts.add(host);
                    continue;
                }
                
                if (!oldHostMap.containsKey(key)) {
                    newHosts.add(host);
                }
            }
            
            for (Map.Entry<String, Instance> entry : oldHostMap.entrySet()) {
                Instance host = entry.getValue();
                String key = entry.getKey();
                if (newHostMap.containsKey(key)) {
                    continue;
                }
                
                if (!newHostMap.containsKey(key)) {
                    remvHosts.add(host);
                }
                
            }
            
            if (newHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("new ips(" + newHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(newHosts));
            }
            
            if (remvHosts.size() > 0) {
                changed = true;
                NAMING_LOGGER.info("removed ips(" + remvHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(remvHosts));
            }
            
            if (modHosts.size() > 0) {
                changed = true;
                updateBeatInfo(modHosts);
                NAMING_LOGGER.info("modified ips(" + modHosts.size() + ") service: " + serviceInfo.getKey() + " -> "
                        + JacksonUtils.toJson(modHosts));
            }
            
            serviceInfo.setJsonFromServer(json);
            
            if (newHosts.size() > 0 || remvHosts.size() > 0 || modHosts.size() > 0) {
                //服務信息變更寫入阻塞隊列
                eventDispatcher.serviceChanged(serviceInfo);
                //磁盤緩存希爾
                DiskCache.write(serviceInfo, cacheDir);
            }
            
        } else {
            //新服務的信息直接加入本地緩存
            changed = true;
            NAMING_LOGGER.info("init new ips(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts()));
            serviceInfoMap.put(serviceInfo.getKey(), serviceInfo);
            eventDispatcher.serviceChanged(serviceInfo);
            serviceInfo.setJsonFromServer(json);
            DiskCache.write(serviceInfo, cacheDir);
        }
        //上報數量
        MetricsMonitor.getServiceInfoMapSizeMonitor().set(serviceInfoMap.size());
        
        if (changed) {
            NAMING_LOGGER.info("current ips:(" + serviceInfo.ipCount() + ") service: " + serviceInfo.getKey() + " -> "
                    + JacksonUtils.toJson(serviceInfo.getHosts()));
        }
        
        return serviceInfo;
    }
    
    private void updateBeatInfo(Set<Instance> modHosts) {
        for (Instance instance : modHosts) {
            String key = beatReactor.buildKey(instance.getServiceName(), instance.getIp(), instance.getPort());
            if (beatReactor.dom2Beat.containsKey(key) && instance.isEphemeral()) {
                BeatInfo beatInfo = beatReactor.buildBeatInfo(instance);
                beatReactor.addBeatInfo(instance.getServiceName(), beatInfo);
            }
        }
    }
    
    //通過key獲取服務對象
    private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
        //得到ServiceInfo的key
        String key = ServiceInfo.getKey(serviceName, clusters);
        //從本地緩存中獲取服務信息
        return serviceInfoMap.get(key);
    }
    //從服務器端獲取Service信息,並解析為ServiceInfo對象
    public ServiceInfo getServiceInfoDirectlyFromServer(final String serviceName, final String clusters)
            throws NacosException {
        String result = serverProxy.queryList(serviceName, clusters, 0, false);
        if (StringUtils.isNotEmpty(result)) {
            return JacksonUtils.toObj(result, ServiceInfo.class);
        }
        return null;
    }
    
    public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
        
        NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
       
        String key = ServiceInfo.getKey(serviceName, clusters);
        //是否開啟本地文件緩存模式
        if (failoverReactor.isFailoverSwitch()) {
            return failoverReactor.getService(key);
        }
        
        ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
        //從serviceInfoMap獲取serviceObj,如果沒有serviceObj,則新生成一個
        if (null == serviceObj) {
            serviceObj = new ServiceInfo(serviceName, clusters);
            
            serviceInfoMap.put(serviceObj.getKey(), serviceObj);
            
            updatingMap.put(serviceName, new Object());
            updateServiceNow(serviceName, clusters);
            updatingMap.remove(serviceName);
            
        } else if (updatingMap.containsKey(serviceName)) {
            //如果更新列表中包含服務,則等待更新結束
            if (UPDATE_HOLD_INTERVAL > 0) {
                // hold a moment waiting for update finish
                synchronized (serviceObj) {
                    try {
                        serviceObj.wait(UPDATE_HOLD_INTERVAL);
                    } catch (InterruptedException e) {
                        NAMING_LOGGER
                                .error("[getServiceInfo] serviceName:" + serviceName + ", clusters:" + clusters, e);
                    }
                }
            }
        }
        //添加更新調度任務
        scheduleUpdateIfAbsent(serviceName, clusters);
        
        return serviceInfoMap.get(serviceObj.getKey());
    }
    //從服務端更新服務
    private void updateServiceNow(String serviceName, String clusters) {
        try {
            updateService(serviceName, clusters);
        } catch (NacosException e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        }
    }
    
    /**
     * Schedule update if absent.
     *
     * @param serviceName service name
     * @param clusters    clusters
     */
    //添加更新調度任務
    public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
        
        synchronized (futureMap) {
            if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
                return;
            }
            
            ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
            futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
        }
    }
    
    /**
     * Update service now.
     *
     * @param serviceName service name
     * @param clusters    clusters
     */
    public void updateService(String serviceName, String clusters) throws NacosException {
        ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
        try {
            
            String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
            
            if (StringUtils.isNotEmpty(result)) {
                processServiceJson(result);
            }
        } finally {
            if (oldService != null) {
                synchronized (oldService) {
                    oldService.notifyAll();
                }
            }
        }
    }
    
    /**
     * Refresh only.
     *
     * @param serviceName service name
     * @param clusters    cluster
     */
    public void refreshOnly(String serviceName, String clusters) {
        try {
            serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
        } catch (Exception e) {
            NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e);
        }
    }
    
    @Override
    public void shutdown() throws NacosException {
        String className = this.getClass().getName();
        NAMING_LOGGER.info("{} do shutdown begin", className);
        ThreadUtils.shutdownThreadPool(executor, NAMING_LOGGER);
        pushReceiver.shutdown();
        failoverReactor.shutdown();
        NAMING_LOGGER.info("{} do shutdown stop", className);
    }
    
    //定時更新已存在的服務
    public class UpdateTask implements Runnable {
        
        long lastRefTime = Long.MAX_VALUE;
        
        private final String clusters;
        
        private final String serviceName;
        
        /**
         * the fail situation. 1:can't connect to server 2:serviceInfo's hosts is empty
         */
        private int failCount = 0;
        
        public UpdateTask(String serviceName, String clusters) {
            this.serviceName = serviceName;
            this.clusters = clusters;
        }
        
        private void incFailCount() {
            int limit = 6;
            if (failCount == limit) {
                return;
            }
            failCount++;
        }
        
        private void resetFailCount() {
            failCount = 0;
        }
        
        @Override
        public void run() {
            long delayTime = DEFAULT_DELAY;
            
            try {
                ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                
                if (serviceObj == null) {
                    updateService(serviceName, clusters);
                    return;
                }
                
                if (serviceObj.getLastRefTime() <= lastRefTime) {
                    updateService(serviceName, clusters);
                    serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
                } else {
                    // if serviceName already updated by push, we should not override it
                    // since the push data may be different from pull through force push
                    refreshOnly(serviceName, clusters);
                }
                
                lastRefTime = serviceObj.getLastRefTime();
                
                if (!eventDispatcher.isSubscribed(serviceName, clusters) && !futureMap
                        .containsKey(ServiceInfo.getKey(serviceName, clusters))) {
                    // abort the update task
                    NAMING_LOGGER.info("update task is stopped, service:" + serviceName + ", clusters:" + clusters);
                    return;
                }
                if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
                    incFailCount();
                    return;
                }
                delayTime = serviceObj.getCacheMillis();
                resetFailCount();
            } catch (Throwable e) {
                incFailCount();
                NAMING_LOGGER.warn("[NA] failed to update serviceName: " + serviceName, e);
            } finally {
                executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
            }
        }
    }
}
View Code
總結

從NacosNamingService初始化的整個過程中,很重要的一點值得我們學習,就是單一職責這個理念在每個類的設計上表現的淋淋盡致。

結束

歡迎大家點點關注,點點贊,上海地區幫助團隊招兩個Java,其中兩點比較重要:靠譜和全日制本科,正常的業務開發,整體技術棧Dubbo+GateWay網關,有意者私聊!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM