服務注冊和發現之Eureka原理篇


概念

在傳統應用組件間調用,通過接口規范約束來實現的,從而實現不同模塊間良好協作;但是被拆分成微服務后,每個微服務實例的數量和網絡地址都可能動態變化,使得原來硬編碼的地址極不方便,故需要一個中心化的組件來進行服務的登記和管理。

服務注冊中心:實現服務治理,管理所有的服務信息和狀態。

注冊中心好處:不用關心服務提供方數量、地址等細節。

注冊中心技術棧:Eureka、Nacos、Consul、Zookeeper等。

服務注冊與發現包括兩部分:一個是服務器端,另一個是客戶端

Server是一個公共服務,為Client提供服務注冊和發現的功能,維護注冊到自身的Client的相關信息,同時提供接口給Client獲取注冊表中其他服務的信息,使得動態變化的Client能夠進行服務間的相互調用。

Client將自己的服務信息通過一定的方式登記到Server上,並在正常范圍內維護自己信息一致性,方便其他服務發現自己,同時可以通過Server獲取到自己依賴的其他服務信息,完成服務調用,還內置了負載均衡器,用來進行基本的負載均衡。

Spring Cloud以Eureka作為服務注冊中心,是一個RESTful風格服務,是服務注冊和發現的基礎組件,它屏蔽了Server和Client的交互細節,使得開發者將精力放在業務上。

使用

Eureka單節點搭建

服務端

  1. 引入pom

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
    </dependency>
    
  2. application.yml

    eureka: 
      client:
        #是否將自己注冊到Eureka Server,默認為true,由於當前就是server,故而設置成false,表明該服務不會向eureka注冊自己的信息
        register-with-eureka: false
        #是否從eureka server獲取注冊信息,由於單節點,不需要同步其他節點數據,用false
        fetch-registry: false
        #設置服務注冊中心的URL,用於client和server端交流
        service-url:                      
          defaultZone: http://username:pwd@localhost:7901/eureka/
    
  3. 代碼

    //啟動類增加@EnableEurekaServer 標識該服務為注冊中心服務端
    @SpringBootApplication
    public class EurekaServerApplication {
        public static void main(String[] args) {
            SpringApplication.run(EurekaServerApplication.class, args);
        }
    }
    

客戶端

  1. 引入pom

    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    
  2. application.yml

    #注冊中心
    eureka: 
      client:
        #設置服務注冊中心的URL
        service-url:                      
          defaultZone: http://username:pwd@localhost:7901/eureka/
    

Eureka也支持高可用,集群環境 搭建與單節點環境類似,需要注意客戶端配置地址defaultZone時候,盡量寫多個地址(寫一個也行,EurekaServer注冊表會自動同步,避免極端情況數據同步(Eureka是AP模型)不及時)

原理

注冊中心本質:存儲每個客戶端的注冊信息,EurekaClient從EurekaServer同步獲取服務注冊列表,通過一定的規則選擇一個服務進行調用。Eureka架構圖如下:

服務提供者:是一個Eureka client,向 Eureka Server注冊和更細自己的信息,同時能從Eureka Server注冊表中獲取到其他服務的信息。

服務注冊中心:提供服務注冊和發現的功能。每個Eureka Client向Eureka Server注冊自己的信息,也可以通過Eureka Server獲取到其他服務的信息達到發現和調用其他服務的目的。

服務消費者:是一個Eureka client,通過Eureka Server獲取注冊的其他服務信息,從而找到所需要的服務發起遠程調用。

注冊:服務提供者向Eureka Server端注冊自身的元數據以供服務發現。

續約:通過發送心跳到Server以維持和更新注冊表中服務實例元數據的有效性。在一定時長內,Server沒有收到Client的心跳信息,將默認下線,會把服務實例信息從注冊表中刪除。

下線:服務提供方在關閉時候主動向Server注銷服務實例元數據,這時服務提供方實例數據將從Server的注冊表中刪除。

獲取注冊表:服務消費者Client向Server請求注冊表信息,用於服務發現,從而發起遠程調用。

源碼

Eureka Client

Eureka Client工作流程

  • Eureka Client通過SpringBoot自動裝配,加載相關類,META-INF/spring.factories如下配置:
EurekaClientAutoConfiguration:Eureka client自動配置類,負責client中關鍵beans的配置和初始化
RibbonEurekaAutoConfiguration:Ribbon負載均衡相關配置
EurekaDiscoveryClientConfiguration:配置自動注冊和應用的健康檢查器
  • Eureka 相關配置類
//EurekaClientConfigBean:封裝了Eureka Client和Eureka Server交互所需要的配置信息
//EurekaInstanceConfigBean:封裝了EurekaClient自身服務實例的配置信息,主要用於構建InstanceInfo
public void initComponent(EurekaInstanceConfig config) {
    try {
        this.config = config;
        this.instanceInfo = new EurekaConfigBasedInstanceInfoProvider(config).get();
    } catch (Throwable e) {
    	throw new RuntimeException("Failed to initialize ApplicationInfoManager", e);
    }
}
  • Eureka客戶端com.netflix.discovery.DiscoveryClient

    該類是Eureka Client核心類 實現了EurekaClient接口(EurekaClient繼承LookupService接口),關鍵代碼如下:

  //構造方法
  DiscoveryClient(ApplicationInfoManager applicationInfoManager,EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
  	//初始化一些參數線程池等
      ...
      //1.根據fetch-registry配置參數拉取注冊表信息
      if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
              fetchRegistryFromBackup();
        } 
      //2.根據register-with-eureka配置參數向服務端注冊
      if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
                  try {
                      if (!register() ) {
                          throw new IllegalStateException("Registration error at startup. Invalid server response.");
                      }
                  } catch (Throwable th) {
                      logger.error("Registration error at startup: {}", th.getMessage());
                      throw new IllegalStateException(th);
                  }
              }
      //3.初始化定時任務 ①用於發送心跳 ②用於刷新緩存 ③按需注冊事件向server注冊
      initScheduledTasks();
      ...
  }

拉取注冊表信息

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
	if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) 
            {
                ...
                //全量拉取
                getAndStoreFullRegistry();
            } else {
        		//增量拉取
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
    ...
}
//拉取方式有兩種:全量和增量
//1.全量拉取
private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        logger.info("Getting all instance registry info from the eureka server");

        Applications apps = null;
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());

        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        //注意:AtomicLong cas進行版本管理
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            //保留UP狀態服務
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }
//2.增量拉取
private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();

        Applications delta = null;
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }

        if (delta == null) {
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // There is a diff in number of instances for some reason
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }

服務注冊

boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            //發送InstanceInfo信息到Server端
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

初始化三個定時任務

private void initScheduledTasks() {
        //1.根據fetch-registry配置參數開啟定時任務刷新client注冊表緩存
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            cacheRefreshTask = new TimedSupervisorTask(
                    "cacheRefresh",
                    scheduler,
                    cacheRefreshExecutor,
                    registryFetchIntervalSeconds,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new CacheRefreshThread()
            );
            scheduler.schedule(
                    cacheRefreshTask,
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
	//2.根據register-with-eureka配置參數開啟心跳監測
        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            heartbeatTask = new TimedSupervisorTask(
                    "heartbeat",
                    scheduler,
                    heartbeatExecutor,
                    renewalIntervalInSecs,
                    TimeUnit.SECONDS,
                    expBackOffBound,
                    new HeartbeatThread()
            );
            scheduler.schedule(
                    heartbeatTask,
                    renewalIntervalInSecs, TimeUnit.SECONDS);
	//3.1 注冊狀態改變監聽器,在應用狀態發生變化時,刷新服務實例信息,在服務實例信息發生改變時向server注冊
            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
		
 //3.2 定時刷新服務實例信息和檢查應用狀態的變化,在服務實例信息發生改變的情況下向server重新發起注冊
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

續租

boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            //服務端發送心跳包
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                //若不存在 則發送注冊
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

服務下線

@PreDestroy
@Override
public synchronized void shutdown() {
    if (isShutdown.compareAndSet(false, true)) {
        logger.info("Shutting down DiscoveryClient ...");

        if (statusChangeListener != null && applicationInfoManager != null) {
            applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
        }
		//清除定時任務
        cancelScheduledTasks();
		
        //取消注冊
        // If APPINFO was registered
        if (applicationInfoManager != null
            && clientConfig.shouldRegisterWithEureka()
            && clientConfig.shouldUnregisterOnShutdown()) {
            applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
            unregister();
        }
		//關閉與服務端通訊連接
        if (eurekaTransport != null) {
            eurekaTransport.shutdown();
        }
		
        heartbeatStalenessMonitor.shutdown();
        registryStalenessMonitor.shutdown();

        Monitors.unregisterObject(this);

        logger.info("Completed shut down of DiscoveryClient");
    }
}

Eureka Server

Server主要有以下幾個功能:接受服務注冊、接受服務心跳、服務剔除、服務下線、集群同步、獲取注冊表中服務實例信息

Eureka Server同時也是一個Eureka Client,在不禁止Eureka Server的客戶端行為時,它會向它配置文件中的其他Eureka Server進行拉取注冊表、服務注冊和發送心跳等操作。

  • Eureka Server通過SpringBoot自動裝配,加載相關類,META-INF/spring.factories如下配置:
EurekaServerAutoConfiguration:向spring的bean工廠添加eureka-server相關功能的bean

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {
    
}
EurekaServerAutoConfiguration生效條件 EurekaServerMarkerConfiguration.Marker所以服務端需要加上@EnableEurekaServer注解

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {

}
  • Eureka Server初始化類 EurekaServerInitializerConfiguration
public void start() {
		new Thread(() -> {
			try {
				//初始化eureka的上下文環境
				eurekaServerBootstrap.contextInitialized(
						EurekaServerInitializerConfiguration.this.servletContext);
				log.info("Started Eureka Server");
				//發布服務端可注冊事件通知
				publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
				EurekaServerInitializerConfiguration.this.running = true;
				//發布Eureka Server已啟動事件通知
                publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
			}
			catch (Exception ex) {
				// Help!
				log.error("Could not initialize Eureka servlet context", ex);
			}
		}).start();
}

EurekaServerBootstrap:啟動類
public void contextInitialized(ServletContext context) {
		try {
			//初始化eureka環境
			initEurekaEnvironment();
			//初始化eureka context 其中兩個主要功能如下
			initEurekaServerContext();

			context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
		}
		catch (Throwable e) {
			log.error("Cannot bootstrap eureka server :", e);
			throw new RuntimeException("Cannot bootstrap eureka server :", e);
		}
	}

protected void initEurekaServerContext() throws Exception {
		// For backward compatibility
		JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);
		XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
				XStream.PRIORITY_VERY_HIGH);

		if (isAws(this.applicationInfoManager.getInfo())) {
			this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
					this.eurekaClientConfig, this.registry, this.applicationInfoManager);
			this.awsBinder.start();
		}

		EurekaServerContextHolder.initialize(this.serverContext);

		log.info("Initialized server context");

		//從相鄰的eureka節點復制注冊表
		int registryCount = this.registry.syncUp();
		//與client 交換信息 其中包括 服務剔除定時任務開啟 openForTraffic()>super.postInit()>EvictionTask
		this.registry.openForTraffic(this.applicationInfoManager, registryCount);
		// Register all monitoring statistics.
		EurekaMonitors.registerAllStats();
	}

  • Server是圍繞注冊表管理的,有兩個InstanceRegistry

com.netflix.eureka.registry.InstanceRegistry是euraka server中注冊表管理的核心接口。職責是在內存中管理注冊到Eureka Server中的服務實例信息。實現類有PeerAwareInstanceRegistryImpl。

org.springframework.cloud.netflix.eureka.server.InstanceRegistry對PeerAwareInstanceRegistryImpl進行了繼承和擴展,使其適配Spring cloud的使用環境,主要的實現由PeerAwareInstanceRegistryImpl提供

com.netflix.eureka.registry.InstanceRegistry extends LeaseManager<InstanceInfo>, LookupService<String> 

LookupService 是提供服務實例的檢索查詢功能

LeaseManager 是對注冊到server中的服務實例租約進行管理,方法有:服務注冊,下線,續約,剔除等

PeerAwareInstanceRegistryImpl關鍵代碼如下:

  1. 服務注冊

public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
    	//具體注冊方法
        super.register(info, leaseDuration, isReplication);
        //注冊完成后同步到它的peer節點
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
//super.register 注冊核心邏輯
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            //構建注冊信息 Map集合 從當前注冊表查詢 若不存在重新創建
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            //修改數據保存到最近隊列 用於客戶端增量更新
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            //修改數據保存到最近隊列 用於客戶端增量更新
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }
  1. 心跳續約

public boolean renew(final String appName, final String id, final boolean isReplication) {			
    	//續約
        if (super.renew(appName, id, isReplication)) {
            //續約完成后同步到它的peer節點
            replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication);
            return true;
        }
        return false;
 }
//super.renew() 續約核算邏輯
public boolean renew(String appName, String id, boolean isReplication) {
        RENEW.increment(isReplication);
        Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
        Lease<InstanceInfo> leaseToRenew = null;
        if (gMap != null) {
            leaseToRenew = gMap.get(id);
        }
        if (leaseToRenew == null) {
            RENEW_NOT_FOUND.increment(isReplication);
            logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
            return false;
        } else {
            InstanceInfo instanceInfo = leaseToRenew.getHolder();
            if (instanceInfo != null) {
                // touchASGCache(instanceInfo.getASGName());
                InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
                        instanceInfo, leaseToRenew, isReplication);
                if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) {
                    logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
                            + "; re-register required", instanceInfo.getId());
                    RENEW_NOT_FOUND.increment(isReplication);
                    return false;
                }
                if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
                    logger.info(
                            "The instance status {} is different from overridden instance status {} for instance {}. "
                                    + "Hence setting the status to overridden status", instanceInfo.getStatus().name(),
                                    instanceInfo.getOverriddenStatus().name(),
                                    instanceInfo.getId());
                    instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);

                }
            }
            renewsLastMin.increment();
            //更新租期 
            leaseToRenew.renew();
            return true;
        }
    }
//leaseToRenew.renew();
public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;

}
  1. 服務下線

public boolean cancel(final String appName, final String id,
                          final boolean isReplication) {
    	//下線邏輯
        if (super.cancel(appName, id, isReplication)) {
            //下線完成后同步到它的peer節點
            replicateToPeers(Action.Cancel, appName, id, null, null, isReplication);
            return true;
        }
        return false;
    }

public boolean cancel(String appName, String id, boolean isReplication) {
        return internalCancel(appName, id, isReplication);
    }
protected boolean internalCancel(String appName, String id, boolean isReplication) {
        try {
            read.lock();
            CANCEL.increment(isReplication);
            Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
            Lease<InstanceInfo> leaseToCancel = null;
            if (gMap != null) {
                //從Map注冊表中移除當前刪除實例
                leaseToCancel = gMap.remove(id);
            }
            recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
            InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
            if (instanceStatus != null) {
                logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
            }
            if (leaseToCancel == null) {
                CANCEL_NOT_FOUND.increment(isReplication);
                logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
                return false;
            } else {
                leaseToCancel.cancel();
                InstanceInfo instanceInfo = leaseToCancel.getHolder();
                String vip = null;
                String svip = null;
                if (instanceInfo != null) {
                    instanceInfo.setActionType(ActionType.DELETED);
                    recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel));
                    instanceInfo.setLastUpdatedTimestamp();
                    vip = instanceInfo.getVIPAddress();
                    svip = instanceInfo.getSecureVipAddress();
                }
                invalidateCache(appName, vip, svip);
                logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
            }
        } finally {
            read.unlock();
        }

        synchronized (lock) {
            if (this.expectedNumberOfClientsSendingRenews > 0) {
                // Since the client wants to cancel it, reduce the number of clients to send renews.
                this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1;
                updateRenewsPerMinThreshold();
            }
        }

        return true;
    }
  1. 服務剔除
public void evict(long additionalLeaseMs) {
        logger.debug("Running the evict task");
		//自我保護策略判斷 決定是否剔除服務 防止網絡分區問題
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }

        // We collect first all expired items, to evict them in random order. For large eviction sets,
        // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
        // the impact should be evenly distributed across all applications.
    	//遍歷服務注冊表獲取租約到期服務列表
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }

        // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
        // triggering self-preservation. Without that we would wipe out full registry.
        //獲取注冊表租約總數    
        int registrySize = (int) getLocalRegistrySize();
    	//計算注冊表租約的閾值 (總數乘以 續租百分比 默認85%),得出要續租的數量
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    	//理論要剔除的數量 = 總數-要續租的數量
        int evictionLimit = registrySize - registrySizeThreshold;
		//實際剔除的數量 =  min(實際租期到期服務實例個數,理論剔除數量)
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

            Random random = new Random(System.currentTimeMillis());
            for (int i = 0; i < toEvict; i++) {
                // Pick a random item (Knuth shuffle algorithm)
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
                internalCancel(appName, id, false);
            }
        }
    }

//自我保護判斷邏輯
public boolean isLeaseExpirationEnabled() {
    	//是否開通自我保護開關
        if (!isSelfPreservationModeEnabled()) {
            // The self preservation mode is disabled, hence allowing the instances to expire.
            return true;
        }
    	//續約數量 > 閥值 可以剔除
        return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
    }

自我保護觸發:

當每分鍾心跳次數( renewsLastMin ) 小於 numberOfRenewsPerMinThreshold 時,並且開啟自動保護模式開關( eureka.server.enable-self-preservation = true ) 時,觸發自我保護機制,不再自動過期租約。
numberOfRenewsPerMinThreshold = expectedNumberOfRenewsPerMin * 續租百分比( eureka.server.renewalPercentThreshold, 默認0.85 )
expectedNumberOfRenewsPerMin = 當前注冊的應用實例數 x 2

默認情況下,注冊的應用實例每半分鍾續租一次,那么一分鍾心跳兩次,因此 x 2 。

服務實例數:10個,期望每分鍾續約數:10 * 2=20,期望閾值:20*0.85=17,自我保護少於17時 觸發。

  1. 服務同步
private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info /* optional */,
                                  InstanceStatus newStatus /* optional */, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            // If it is a replication already, do not replicate again as this will create a poison replication
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                // If the url represents this host, do not replicate to yourself.
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                //具體同步方法
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }
  1. Eureka事件

    EurekaInstanceCanceledEvent 服務下線事件

    EurekaInstanceRegisteredEvent 服務注冊事件

    EurekaInstanceRenewedEvent 服務續約事件

    EurekaRegistryAvailableEvent 注冊中心可用事件

    EurekaServerStartedEvent 注冊中心啟動

注意:由於集群間的同步復制是通過HTTP的方式進行,基於網絡的不可靠性,集群中的Eureka Server間的注冊表信息難免存在不同步的時間節點,不滿足CAP中的C(數據一致性)。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM