Nacos服務發現源碼解析


1.Spring服務發現的統一規范

Spring將這套規范定義在Spring Cloud Common中

 

discovery包下面定義了服務發現的規范

核心接口:DiscoveryClient 用於服務發現

2.Nacos客戶端實現服務發現

 

NacosDiscoveryClient 實現 DiscoveryClient接口。

當nacos客戶端運行起來之后,並不會去請求服務信息,只是會去做服務注冊,配置獲取等。

當第一次請求時候,才會去獲取服務,也就是懶加載

1.在本地查找實例緩存信息,如果緩存為空,則開啟定時任務請求服務端獲取實例信息列表來更新緩存到本地

@Override
public List<ServiceInstance> getInstances(String serviceId) {
    try {
        return serviceDiscovery.getInstances(serviceId);
    }
    ...省略
}
public List<ServiceInstance> getInstances(String serviceId) throws NacosException {
    String group = discoveryProperties.getGroup();
    List<Instance> instances = namingService().selectInstances(serviceId, group, true);
    return hostToServiceInstanceList(instances, serviceId);
}
public List<Instance> selectInstances(String serviceName, String groupName, List<String> clusters, boolean healthy,
        boolean subscribe) throws NacosException {
    ServiceInfo serviceInfo;
    // 默認訂閱服務
    if (subscribe) {
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName),
                StringUtils.join(clusters, ","));
    } else {
        serviceInfo = hostReactor
                .getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName),
                        StringUtils.join(clusters, ","));
    }
    return selectInstances(serviceInfo, healthy);
}
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    // 本地緩存找
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        updatingMap.put(serviceName, new Object());
        // 從服務端拉取服務信息
 updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);
    } else if (updatingMap.containsKey(serviceName)) {
        ...省略
    }
   //添加定時任務 scheduleUpdateIfAbsent(serviceName, clusters);
return serviceInfoMap.get(serviceObj.getKey()); } // 更新服務信息 private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) { NAMING_LOGGER.error("[NA] failed to update serviceName: " + serviceName, e); } } //如果不存在,則添加定時任務 public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } //添加定時任務 ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } // 從服務端拉取服務信息 public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { // 將服務端拉取到的服務信息緩存在本地 processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } //查詢列表 public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); //最終調用Http請求拉取服務器服務信息列表 return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET); }

3.服務端服務發現

 naming項目下的 InstanceController類

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
    //獲取參數並校驗
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    // 根據命名空間id,服務名獲取實例信息
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
            healthyOnly);
}

public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,
        int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //獲取服務
    Service service = serviceManager.getService(namespaceId, serviceName);
    long cacheMillis = switchDomain.getDefaultCacheMillis();
    // 嘗試啟用推送
    try {
        if (udpPort > 0 && pushService.canEnablePush(agent)) {
            pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort),
                            pushDataSource, tid, app);
            cacheMillis = switchDomain.getPushCacheMillis(serviceName);
        }
    } catch (Exception e) {
        Loggers.SRV_LOG.error("[NACOS-API] failed to added push client {}, {}:{}", clientInfo, clientIP, udpPort, e);
        cacheMillis = switchDomain.getDefaultCacheMillis();
    }
    if (service == null) {
        if (Loggers.SRV_LOG.isDebugEnabled()) {
            Loggers.SRV_LOG.debug("no instance to serve for service: {}", serviceName);
        }
        result.put("name", serviceName);
        result.put("clusters", clusters);
        result.put("cacheMillis", cacheMillis);
        result.replace("hosts", JacksonUtils.createEmptyArrayNode());
        return result;
    }
    //檢查服務是否禁用
    checkIfDisabled(service);
    List<Instance> srvedIPs;
    // 獲取所有永久和臨時服務實例
    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
    // 選擇器過濾服務
    if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
        srvedIPs = service.getSelector().select(clientIP, srvedIPs);
    }
    ...如果找不到服務則返回當前服務
}
//獲取所有的永久實例和臨時實例
public List<Instance> srvIPs(List<String> clusters) {
    if (CollectionUtils.isEmpty(clusters)) {
        clusters = new ArrayList<>();
        clusters.addAll(clusterMap.keySet());
    }
    return allIPs(clusters);
}
public List<Instance> allIPs() {
    List<Instance> allInstances = new ArrayList<>();
    allInstances.addAll(persistentInstances); allInstances.addAll(ephemeralInstances); return allInstances;
}

推送服務實例信息

public void addClient(String namespaceId, String serviceName, String clusters, String agent,
        InetSocketAddress socketAddr, DataSource dataSource, String tenant, String app) {
    // 初始化推送客戶端
    PushClient client = new PushClient(namespaceId, serviceName, clusters, agent, socketAddr, dataSource, tenant, app);
    addClient(client);
}

// 添加推送目標客戶端。
public void addClient(PushClient client) {
    // 客戶端由鍵“ serviceName”存儲,因為通知事件由serviceName更改驅動
    String serviceKey = UtilsAndCommons.assembleFullServiceName(client.getNamespaceId(), client.getServiceName());
    ConcurrentMap<String, PushClient> clients = clientMap.get(serviceKey);
    // 如果獲取不到推送客戶端則新建推送客戶端,並緩存
    if (clients == null) {
        clientMap.putIfAbsent(serviceKey, new ConcurrentHashMap<>(1024));
        clients = clientMap.get(serviceKey);
    }
    // 刷新或者緩存
    PushClient oldClient = clients.get(client.toString());
    if (oldClient != null) {
        oldClient.refresh();
    } else {
        PushClient res = clients.putIfAbsent(client.toString(), client);
        if (res != null) {
            Loggers.PUSH.warn("client: {} already associated with key {}", res.getAddrStr(), res.toString());
        }
        Loggers.PUSH.debug("client: {} added for serviceName: {}", client.getAddrStr(), client.getServiceName());
    }
}

 總結:

返回指定命名空間下內存注冊表中所有的永久實例和臨時實例給客戶端


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM