Nacos-服務注冊


一.介紹

1.1 Nacos的實現原理

   

   

   

   

   

   

   

   

   

圖片來自: https://www.cnblogs.com/wuzhenzhao/p/13625491.html

1.2 本地啟動

  1. 下載源碼包: https://github.com/alibaba/nacos.git

下載好后,我個人選擇切換到master分支,代碼比較完整。

  1. 找到console,選擇Nacos這個類,然后啟動, 如果有報錯,可能是本地config文件夾下需要一個application.properties文件,從console里面復制一份到本地即可;
  2. 本地啟動時候設置一下單機模式: -Dnacos.standalone=true

1.3 注冊中心

注冊中心其實就是一個Springboot的項目

@SpringBootApplication(scanBasePackages = "com.alibaba.nacos")

@ServletComponentScan

@EnableScheduling

public class Nacos {

  public static void main(String[] args) {

    SpringApplication.run(Nacos.class, args);

  }

}

啟動后會初始化naming,console,config包下面的controllers接口,保存到methodsCache中。

@Component

@EnableScheduling

@PropertySource("/application.properties")

public class ConsoleConfig {

    @Autowired

    private ControllerMethodsCache methodsCache;

    @PostConstruct

    public void init() {

         methodsCache.initClassMethod("com.alibaba.nacos.naming.controllers");

        methodsCache.initClassMethod("com.alibaba.nacos.console.controller");

        methodsCache.initClassMethod("com.alibaba.nacos.config.server.controller");

    }

}

1.4 Client啟動類

spring-cloud-starter-alibaba-nacos-discovery 工程中 META-INF\spring.factories文件注入的類;

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\

com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\

com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\

com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\

com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\

com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\

com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

二. 服務注冊客戶端的處理

先來一張服務注冊流程圖:

圖片來自: https://blog.csdn.net/wangwei19871103/article/details/105787403

2.1 NacosDiscoveryAutoConfiguration配置實例化

public class NacosDiscoveryAutoConfiguration {

    @Bean

    public NacosServiceRegistry nacosServiceRegistry(

           NacosDiscoveryProperties nacosDiscoveryProperties) {

           return new NacosServiceRegistry(nacosDiscoveryProperties);

    }

    @Bean

    @ConditionalOnBean(AutoServiceRegistrationProperties.class)

    public NacosRegistration nacosRegistration(

           NacosDiscoveryProperties nacosDiscoveryProperties,

           ApplicationContext context) {

        return new NacosRegistration(nacosDiscoveryProperties, context);

      }

    @Bean

    @ConditionalOnBean(AutoServiceRegistrationProperties.class)

    public NacosAutoServiceRegistration nacosAutoServiceRegistration(

           NacosServiceRegistry registry, //上面方法實例化的

           AutoServiceRegistrationProperties autoServiceRegistrationProperties,

           NacosRegistration registration) { //上面方法實例化的

        return new NacosAutoServiceRegistration(registry,  autoServiceRegistrationProperties, registration);

    }

}

2.1.1 實例化 NacosDiscoveryProperties

讀取的是配置文件里面的內容:

@ConfigurationProperties("spring.cloud.nacos.discovery")

public class NacosDiscoveryProperties { ... )

2.1.2 實例化 NacosServiceRegistry

public NacosServiceRegistry(NacosDiscoveryProperties nacosDiscoveryProperties) {

       this.nacosDiscoveryProperties = nacosDiscoveryProperties;

       this.namingService = nacosDiscoveryProperties.namingServiceInstance();

}

后面工廠調用到:

  namingService= NacosFactory.createNamingService(getNacosProperties());

  getNacosProperties方法里面加一些秘鑰,clusterName啥的操作;

  創建方法里面用到反射;

2.1.3 實例化 NacosRegistration

public NacosRegistration(NacosDiscoveryProperties nacosDiscoveryProperties,

ApplicationContext context) {

  this.nacosDiscoveryProperties = nacosDiscoveryProperties;

    this.context = context;

}

這里就填充一下; 不過在init初始化方法里面填充了一部分management和心跳的信息;

2.1.4 實例化 NacosAutoServiceRegistration

參數 AutoServiceRegistrationProperties里面是幾個配置;

public NacosAutoServiceRegistration(ServiceRegistry<Registration> serviceRegistry,

      AutoServiceRegistrationProperties autoServiceRegistrationProperties,

       NacosRegistration registration) {

        super(serviceRegistry, autoServiceRegistrationProperties);

        this.registration = registration;

}  

2.2 NacosDiscoveryClientAutoConfiguration

@Configuration

@ConditionalOnNacosDiscoveryEnabled

@AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class,

CommonsClientAutoConfiguration.class })

public class NacosDiscoveryClientAutoConfiguration {

    @Bean //讀取配置文件實例化

    @ConditionalOnMissingBean

    public NacosDiscoveryProperties nacosProperties() {

         return new NacosDiscoveryProperties();

    }

    @Bean

    public DiscoveryClient nacosDiscoveryClient(

        NacosDiscoveryProperties discoveryProperties) {

    return new NacosDiscoveryClient(discoveryProperties); // 當前的實例的客戶端

    }

    @Bean

    @ConditionalOnMissingBean

    @ConditionalOnProperty(value = "spring.cloud.nacos.discovery.watch.enabled", matchIfMissing = true)

    public NacosWatch nacosWatch(NacosDiscoveryProperties nacosDiscoveryProperties) {

      return new NacosWatch(nacosDiscoveryProperties); // 該類會在啟動后發布一次心跳

    }

 }

2.2.1 實例化NacosWatch

public NacosWatch(NacosDiscoveryProperties properties) {

    this(properties, getTaskScheduler());

}

@Override

public void start() {

    if (this.running.compareAndSet(false, true)) {

         this.watchFuture = this.taskScheduler.scheduleWithFixedDelay(

         this::nacosServicesWatch, this.properties.getWatchDelay()); //延遲30000ms

    }

}

public void nacosServicesWatch() {

    // nacos doesn't support watch now , publish an event every 30 seconds.

    this.publisher.publishEvent(

         new HeartbeatEvent(this, nacosWatchIndex.getAndIncrement()));

}

2.3 NacosNamingService

1.4.第2節中創建的NamingService跟服務注冊、心跳等有關的所有代碼都在該類中,在該類構造方法中會調用init方法;

2.3.1 init()方法

private void init(Properties properties) {

    namespace = InitUtils.initNamespaceForNaming(properties); //獲取namespace

    initServerAddr(properties); //初始化注冊中心地址

        InitUtils.initWebRootContext();

    initCacheDir(); // 初始化緩存目錄地址,默認在/nacos/naming/public 目錄下

        initLogName(properties); // 初始化日志名稱,默認為naming.log

   

    eventDispatcher = new EventDispatcher(); // 后台會啟動一個線程用來創建namingEvent

    //下面幾個重要, BeatReactor創建心跳反應堆,HostReactor創建主機反應堆

    serverProxy = new NamingProxy(namespace, endpoint, serverList);

        serverProxy.setProperties(properties);

        beatReactor = new BeatReactor(serverProxy, initClientBeatThreadCount(properties));

        hostReactor = new HostReactor(eventDispatcher, serverProxy, cacheDir, isLoadCacheAtStart(properties), initPollingThreadCount(properties));

}

2.3.2 EventDispatcher方法創建namingEvent

public EventDispatcher() {

  executor = Executors.newSingleThreadExecutor(new ThreadFactory() {

  executor.execute(new Notifier());

}

private class Notifier implements Runnable {

    @Override

    public void run() {

         while (true) {

         ServiceInfo serviceInfo = null; //5

         serviceInfo = changedServices.poll(5, TimeUnit.MINUTES);

      List<EventListener> listeners = observerMap.get(serviceInfo.getKey());

      for (EventListener listener : listeners) {

             List<Instance> hosts = Collections.unmodifiableList(serviceInfo.getHosts());

        listener.onEvent(new NamingEvent(serviceInfo.getName(), serviceInfo.getGroupName(), serviceInfo.getClusters(), hosts));

           }

      }

    }

}

2.3.3 InitUtils.initNamespaceForNaming()方法

nacos的數據模型是由三元組確定,分別為Namespace、Group、dataid。下面方法用於獲取Namespace,默認返回public

/* 初始化命名空間以進行命名。 配置初始化不一樣,因此不能直接重用 */

public static String initNamespaceForNaming(Properties properties) {

  String tmpNamespace = null;

        String isUseCloudNamespaceParsing =

    properties.getProperty(PropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,

       System.getProperty(SystemPropertyKeyConst.IS_USE_CLOUD_NAMESPACE_PARSING,

       String.valueOf(Constants.DEFAULT_USE_CLOUD_NAMESPACE_PARSING)));

  if (Boolean.valueOf(isUseCloudNamespaceParsing)) {

    tmpNamespace = TenantUtil.getUserTenantForAns();

         tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace,  new    Callable<String>() {

        @Override

         public String call() {

           String namespace = System.getProperty(SystemPropertyKeyConst.ANS_NAMESPACE);

           return namespace;

         }

       });

       tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new     Callable<String>() {

         @Override

         public String call() {

           String namespace = System.getenv(PropertyKeyConst.SystemEnv.ALIBABA_ALIWARE_NAMESPACE);

        return namespace;

      }

    });

  }

     tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {

      @Override

       public String call() {

         String namespace = System.getProperty(PropertyKeyConst.NAMESPACE);

    return namespace;

    }

  });

  if (StringUtils.isEmpty(tmpNamespace) && properties != null) {

    tmpNamespace = properties.getProperty(PropertyKeyConst.NAMESPACE);

  }

  tmpNamespace = TemplateUtils.stringEmptyAndThenExecute(tmpNamespace, new Callable<String>() {

    @Override

    public String call() {

      return UtilAndComs.DEFAULT_NAMESPACE_ID; //默認返"public"

    }

  });

  return tmpNamespace;

}

2.3.4 NamingProxy方法

public NamingProxy(String namespaceId, String endpoint, String serverList) {

      this.namespaceId = namespaceId;

       this.endpoint = endpoint;

       if (StringUtils.isNotEmpty(serverList)) {

         this.serverList = Arrays.asList(serverList.split(","));

         if (this.serverList.size() == 1) {

           this.nacosDomain = serverList;

         }

       }

       initRefreshSrvIfNeed();

}

內部函數 initRefreshSrvIfNeed

private void initRefreshSrvIfNeed() {

  // endpoint為空則返回     

  endpointScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {

        @Override

        public Thread newThread(Runnable r) {

           Thread t = new Thread(r);

          t.setName("com.alibaba.nacos.client.naming.serverlist.updater");

           t.setDaemon(true);

          return t;

        }

  });

  //啟動了定時線程更新服務注冊時間; 30

    executorService.scheduleWithFixedDelay(new Runnable() {

       @Override

      public void run() {

        refreshSrvIfNeed();

    }

  }, 0, vipSrvRefInterMillis, TimeUnit.MILLISECONDS);

       refreshSrvIfNeed();

}

2.4 NacosAutoServiceRegistration

2.4.1 NacosWatch添加發布者

2.4.2 Springboot啟動時調用 finishRefresh

@Override

protected void finishRefresh() {

      super.finishRefresh();

      WebServer webServer = startWebServer();

      if (webServer != null) {

         publishEvent(new ServletWebServerInitializedEvent(webServer, this));

      }

}

2.4.3 NacosAutoServiceRegistration綁定上面的event

public class NacosAutoServiceRegistration

  extends AbstractAutoServiceRegistration<Registration> { ...}

  父類中方法

  @Override

  @SuppressWarnings("deprecation")

  public void onApplicationEvent(WebServerInitializedEvent event) {

        bind(event);

  }

@Deprecated

public void bind(WebServerInitializedEvent event) {

  his.start();

}

繼續調用start:

public void start() {

  if (!isEnabled()) {

    return;

  }

  if (!this.running.get()) {

    this.context.publishEvent(

    new InstancePreRegisteredEvent(this, getRegistration())); //發布實例預注冊事件

    register(); //這里調用注冊方法

    if (shouldRegisterManagement()) {

      registerManagement();

    }

    this.context.publishEvent(

      new InstanceRegisteredEvent<>(this, getConfiguration())); //發布實例注冊事件

    this.running.compareAndSet(false, true);

  }

}

2.5 NacosServiceRegistry

2.5.1 NacosServiceRegistry注冊

@Override

public void register(Registration registration) {

        //對應當前應用的application.name

       String serviceId = registration.getServiceId();

  //表示服務實例信息

       Instance instance = getNacosInstanceFromRegistration(registration);

           //通過命名服務進行注冊

      namingService.registerInstance(serviceId, instance);

}

2.5.2 NacosNamingService開始注冊實例

主要做兩個動作

  1. 如果當前注冊的是臨時節點,則構建心跳信息,通過beat反應堆來構建心跳任務
  2. 調用registerService發起服務注冊

@Override

public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {

  //是臨時節點,則構建心跳信息

     if (instance.isEphemeral()) {

       BeatInfo beatInfo = new BeatInfo();

       beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));

       beatInfo.setIp(instance.getIp());

       beatInfo.setPort(instance.getPort());

       beatInfo.setCluster(instance.getClusterName());

       beatInfo.setWeight(instance.getWeight());

       beatInfo.setMetadata(instance.getMetadata());

       beatInfo.setScheduled(false);

       long instanceInterval = instance.getInstanceHeartBeatInterval();

       beatInfo.setPeriod(instanceInterval == 0 ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);

          //啟動心跳定時任務, 添加心跳信息進行處理

       beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);

  }

     //調用服務代理類進行注冊

     serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);

}

addBeatInfo方法里面有一個調度任務

class BeatTask implements Runnable {

     BeatInfo beatInfo;

    public BeatTask(BeatInfo beatInfo) {

           this.beatInfo = beatInfo;

     }

     @Override

    public void run() {

       long result = serverProxy.sendBeat(beatInfo);

       long nextTime = result > 0 ? result : beatInfo.getPeriod();

       //在給定延時之后調度任務

       executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);

}

   

調用registerService方法(邏輯比較簡單):

public void registerService(String serviceName, String groupName, Instance instance) {

       final Map<String, String> params = new HashMap<String, String>(9);

       //params 填充參數,namespaceId,serviceName,groupName,ip.port,ephemeral等參數(...省略)

    //請求API

       reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, HttpMethod.POST);

}

調用reqAPI方法:服務在進行注冊的時候會輪詢配置好的注冊中心的地址:

reqApi()首先會發送一次注冊請求,如果請求失敗則進入重試注冊循環中,有3次重試次數,如果依然注冊失敗則會拋出異常。

public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {

       params.put(CommonParams.NAMESPACE_ID, getNamespaceId());

       Exception exception = new Exception();

       if (servers != null && !servers.isEmpty()) {

         //隨機獲取一台服務器節點

         Random random = new Random(System.currentTimeMillis());

        int index = random.nextInt(servers.size());

         // 遍歷服務列表

        for (int i = 0; i < servers.size(); i++) {

      String server = servers.get(index); //獲得索引位置的服務節點

          try {

             return callServer(api, params, server, method); //調用指定服務

           } catch (NacosException e) { ... }

             index = (index + 1) % servers.size(); //輪詢下一個

               }

       }

    for (int i = 0; i < UtilAndComs.REQUEST_DOMAIN_RETRY_COUNT; i++) {

  return callServer(api, params, nacosDomain);

     }

}

callServer(api, params, server, method) 調用,通過HttpURLConnection 進行發起調用。

public String callServer(String api, Map<String, String> params, String curServer, String method) {

  HttpClient.HttpResult result = HttpClient.request(url, headers, params, UtilAndComs.ENCODING, method)

}

debug會發現有一個請求: http://xxxx:8848/nacos/v1/ns/instance ;

另外還有: /nacos/v1/ns/instance/beat的定時任務請求;

三. Nacos注冊中心端的處理

3.1 InstanceController.register()接口

@CanDistro

@PostMapping

@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)

public String register(HttpServletRequest request) throws Exception {

  final Instance instance = parseInstance(request); //從請求中解析出instance實例

  //調用 ServiceManager 進行服務的注冊

       serviceManager.registerInstance(namespaceId, serviceName, instance);

       return "ok";

}

3.1.1 registerInstance方法

serviceManager,通過該類管理service生命周期活動;

public void registerInstance(String namespaceId, String serviceName, Instance instance) {

      //創建空服務,實際上是初始化一個ConcurrentHashMap集合

      createEmptyService(namespaceId, serviceName, instance.isEphemeral());

      //serviceMap中,根據namespaceIdserviceName得到一個服務對象

      Service service = getService(namespaceId, serviceName);

  //調用addInstance創建一個服務實例, 添加一致性協議

      addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);

}

3.1.2 createServiceIfAbsent方法

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster){

    Service service = getService(namespaceId, serviceName); //serviceMap中獲取服務對象

     if (service == null) { //如果為空。則初始化

           service = new Service();

           (...省略部分代碼)

           service.setLastModifiedMillis(System.currentTimeMillis());

           service.recalculateChecksum();

          if (cluster != null) {

             cluster.setService(service);

             service.getClusterMap().put(cluster.getName(), cluster);

           }

           service.validate();

          putServiceAndInit(service);

          if (!local) {

             addOrReplaceService(service);

          }

  }

}

Nacos是通過不同的 namespace 來維護服務的,而每個namespace下有不同的group,不同的group下才有對應的Service ,再通過這個 serviceName 來確定服務實例。   

第一次進來則會進入初始化,初始化完會調用 putServiceAndInit;

/** Map(namespace, Map(group::serviceName, Service)). */

private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();

   

putServiceAndInit方法

private void putServiceAndInit(Service service) throws NacosException {

       putService(service); //把服務信息保存到serviceMap集合

       service.init(); //建立心跳檢測機制

  //實現數據一致性監聽,ephemeral=true表示采用Distro協議,false表示采用raft

       consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);

    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);

}

3.1.3 addInstance方法

獲取到服務后把服務實例添加到集合中,然后基於一致性協議進行數據的同步。然后調用addInstance ;

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips){

  //組裝key,例如: com.alibaba.nacos.naming.iplist.ephemeral.public##DEFAULT_GROUP@@nacos-config-client

    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);

  Service service = getService(namespaceId, serviceName); //獲取剛剛組裝的服務

  synchronized (service) {

  List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);

  Instances instances = new Instances();

  instances.setInstanceList(instanceList);

          consistencyService.put(key, instances); // 上一步實現監聽的類里添加注冊服務

     }

}

然后給服務注冊方發送注冊成功的響應(return "ok")。結束服務注冊流程;

3.2 InstanceController.beat()接口

@CanDistro

@PutMapping("/beat")

@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)

public ObjectNode beat(HttpServletRequest request) throws Exception {

      ObjectNode result = JacksonUtils.createEmptyJsonNode();

  result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, switchDomain.getClientBeatInterval());

      String beat = WebUtils.optional(request, "beat", StringUtils.EMPTY); //解析beat參數

      RsInfo clientBeat = null;

  if (StringUtils.isNotBlank(beat)) {

        clientBeat = JacksonUtils.toObj(beat, RsInfo.class);

  }

      //獲取namespaceId,serviceName ,serviceName ,ip ,port等參數(...省略)

      //獲取實例

       Instance instance = serviceManager.getInstance(namespaceId, serviceName, serviceName, ip, port);

  //如果沒有獲取到實例則新建一個

  if (instance == null) {

  instance = new Instance();

  //填充實例的屬性值ip ,port(...省略)

  //注冊實例

  serviceManager.registerInstance(namespaceId, serviceName, instance);

       }

  Service service = serviceManager.getService(namespaceId, serviceName);

  if (clientBeat == null) {

  clientBeat = new RsInfo();

  clientBeat.setIp(ip);

  clientBeat.setPort(port);

  clientBeat.setCluster(clusterName);

  }

  //處理客戶端心跳方法下章單獨分析,主要是客戶端續約

      service.processClientBeat(clientBeat);

  result.put(CommonParams.CODE, NamingResponseCode.OK);

    if (instance.containsMetadata(PreservedMetadataKeys.HEART_BEAT_INTERVAL)) {

        result.put(SwitchEntry.CLIENT_BEAT_INTERVAL, instance.getInstanceHeartBeatInterval());

  }

  result.put(SwitchEntry.LIGHT_BEAT_ENABLED, switchDomain.isLightBeatEnabled());

  return result; //返回客戶端心跳

}

   

   

   

參考:

https://blog.csdn.net/V_zxw/article/details/108244306,

https://www.cnblogs.com/wuzhenzhao/p/13625491.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM