eureka源码--服务的注册、服务续约、服务发现、服务下线、服务剔除、定时任务以及自定义注册中心的思路


微服务注册后,在注册中心的注册表结构是一个map: ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry,假如一个order服务部署了三台机器,那么Map的第一个key为服务名称,第二个map的key是实例编号(instance-id),

InstanceInfo该对象封装了服务的主要信息,例如ip 端口 服务名称 服务的编号等:

如图:

 

 

 一、服务的注册

    1.客户端源码(DiscoveryClient类里面):

 

 

      2.服务端的源码(AbstractInstanceRegistry类):

 

 

 

 二、服务的续约

      1.客户端源码(DiscoveryClient类里面):通过发送心跳进行续约,告诉注册中心我还活着

  

 

 

 2.服务端的源码(AbstractInstanceRegistry类):

 

 

  三、服务的下线(客户端关闭时,主动发送消息给注册中心,注册中心从注册表中将该服务实例删除)

       1.客户端源码(DiscoveryClient类里面):

   

 

   

 

       

 

 2.服务端的源码(AbstractInstanceRegistry类):

 

   四、服务的剔除:当注册中心服务器一直收不到客户端的心跳续约超过一定时间限制时,注册中心会将该服务从注册表中剔除,该功能只存在注册中心

            注册中心源码(AbstractInstanceRegistry类):

 五、服务的发现:客户端要向注册中心拉取注册列表

 1.客户端源码(DiscoveryClient类里面):

 

 

 1.1全量拉取:

 

  

 

 

 

 1.2 增量拉取

 

 

 

 

 

 

 

 2.服务端的源码(AbstractInstanceRegistry类):

2.1 注册中心全量拉取:

 

 

 

 

 

 2.2 增量拉取:

 六 、定时器

 1.客户端会定时向注册中心发送心跳进行续约以及定时去注册中心拉取最新的注册列表信息

 客户端源码(DiscoveryClient类的构造器里):省略部分无关代代码:

@Inject
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
       //此处省略了部分代码
        try {
            //创建一个线程调度器
            scheduler = Executors.newScheduledThreadPool(2,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
            //处理心跳的线程池
            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff
            //拉取注册表的线程池
            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>(),
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                            .setDaemon(true)
                            .build()
            );  // use direct handoff

        //此处省略部分无关代码
        initScheduledTasks(); //调用该方法,该方法会执行对应的线程池

       
    }
  */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new DiscoveryClient.CacheRefreshThread() //该方法里面会调用拉取注册表的方法
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS); //开启定时任务
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread();//该线程会去调用发送心跳方法
            );
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS); //开启心跳定时任务
        //此处省略部分无关代码
    }
 
 

 //拉取注册列表的线程:

 

 //发送心跳的线程:

 

 

 2. 注册中心服务剔除定时器(注册中心源码(AbstractInstanceRegistry类))

 

 

 

 

 

 总结:eureka注册中心是去化的,注册表是存在内存中的,并且客户端拉取一份注册表后,会存在本地缓存中,因此即使注册中心挂了,一样不影响客户端相互调用

附加: 客户端如何获取服务实例demo

 

 

 

 综上所述,其实我们也可以自己写个简单的注册中心,思路如下:

1.创建一个springboot项目,写个controller类,提供注册,续约,下线,获取服务列表四个接口

2. 定义一个实例对象Instance,该对象封装ip,端口 还有更新时间

3.客户端调用注册接口,将Instance作为参数传过来,注册中心取到对应实例,存到Map<String,Map<String,Instance>> 中

4.客户端弄个定时器,每个一段时间,调用注册中心的续约方法,将更新实例的修改时间

5.客户端弄个定时器,每隔一段时间向注册中心拉取服务,其实就是拉取Map<String,Map<String,Instance>>

6.注册中心弄个定时器,每隔一段时间遍历Map<String,Map<String,Instance>>,找出每个实例中的更新时间,加上过期时间,然后跟当前时间比较,看看有没过期,如果过期就剔除,也就是在map中删除

 

 

 

 

 

 

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM