eureka源碼--服務的注冊、服務續約、服務發現、服務下線、服務剔除、定時任務以及自定義注冊中心的思路


微服務注冊后,在注冊中心的注冊表結構是一個map: ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry,假如一個order服務部署了三台機器,那么Map的第一個key為服務名稱,第二個map的key是實例編號(instance-id),

InstanceInfo該對象封裝了服務的主要信息,例如ip 端口 服務名稱 服務的編號等:

如圖:

 

 

 一、服務的注冊

    1.客戶端源碼(DiscoveryClient類里面):

 

 

      2.服務端的源碼(AbstractInstanceRegistry類):

 

 

 

 二、服務的續約

      1.客戶端源碼(DiscoveryClient類里面):通過發送心跳進行續約,告訴注冊中心我還活着

  

 

 

 2.服務端的源碼(AbstractInstanceRegistry類):

 

 

  三、服務的下線(客戶端關閉時,主動發送消息給注冊中心,注冊中心從注冊表中將該服務實例刪除)

       1.客戶端源碼(DiscoveryClient類里面):

   

 

   

 

       

 

 2.服務端的源碼(AbstractInstanceRegistry類):

 

   四、服務的剔除:當注冊中心服務器一直收不到客戶端的心跳續約超過一定時間限制時,注冊中心會將該服務從注冊表中剔除,該功能只存在注冊中心

            注冊中心源碼(AbstractInstanceRegistry類):

 五、服務的發現:客戶端要向注冊中心拉取注冊列表

 1.客戶端源碼(DiscoveryClient類里面):

 

 

 1.1全量拉取:

 

  

 

 

 

 1.2 增量拉取

 

 

 

 

 

 

 

 2.服務端的源碼(AbstractInstanceRegistry類):

2.1 注冊中心全量拉取:

 

 

 

 

 

 2.2 增量拉取:

 六 、定時器

 1.客戶端會定時向注冊中心發送心跳進行續約以及定時去注冊中心拉取最新的注冊列表信息

 客戶端源碼(DiscoveryClient類的構造器里):省略部分無關代代碼:

@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
       //此處省略了部分代碼
        try {
            //創建一個線程調度器
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
            //處理心跳的線程池
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            //拉取注冊表的線程池
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

        //此處省略部分無關代碼
        initScheduledTasks(); //調用該方法,該方法會執行對應的線程池

       
    }
  */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new DiscoveryClient.CacheRefreshThread() //該方法里面會調用拉取注冊表的方法
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS); //開啟定時任務
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread();//該線程會去調用發送心跳方法
            );
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS); //開啟心跳定時任務
        //此處省略部分無關代碼
    }
 
         

 //拉取注冊列表的線程:

 

 //發送心跳的線程:

 

 

 2. 注冊中心服務剔除定時器(注冊中心源碼(AbstractInstanceRegistry類))

 

 

 

 

 

 總結:eureka注冊中心是去化的,注冊表是存在內存中的,並且客戶端拉取一份注冊表后,會存在本地緩存中,因此即使注冊中心掛了,一樣不影響客戶端相互調用

附加: 客戶端如何獲取服務實例demo

 

 

 

 綜上所述,其實我們也可以自己寫個簡單的注冊中心,思路如下:

1.創建一個springboot項目,寫個controller類,提供注冊,續約,下線,獲取服務列表四個接口

2. 定義一個實例對象Instance,該對象封裝ip,端口 還有更新時間

3.客戶端調用注冊接口,將Instance作為參數傳過來,注冊中心取到對應實例,存到Map<String,Map<String,Instance>> 中

4.客戶端弄個定時器,每個一段時間,調用注冊中心的續約方法,將更新實例的修改時間

5.客戶端弄個定時器,每隔一段時間向注冊中心拉取服務,其實就是拉取Map<String,Map<String,Instance>>

6.注冊中心弄個定時器,每隔一段時間遍歷Map<String,Map<String,Instance>>,找出每個實例中的更新時間,加上過期時間,然后跟當前時間比較,看看有沒過期,如果過期就剔除,也就是在map中刪除

 

 

 

 

 

 
        

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM