spring-cloud-netflix-eureka 源碼解析:
本文主要針對 spring-cloud-dependencies Hoxton.SR4版本, spring-cloud-starter-netflix-eureka-server 的 2.2.2.RELEASE 版本進行源碼的解析。
對於未接觸過 Eureka 的小伙伴可以參考 https://www.cnblogs.com/wuzhenzhao/p/9466752.html 進行一些基礎知識的了解。
這里主要從以下幾個方面進行源碼解析:
- 服務注冊入口
- 服務注冊請求的發起
- Eureka Server 處理注冊請求
- Eureka Server 服務信息同步
- Eureka 的多級緩存設計
- 服務續約
- Eureka Serer 收到續約心跳請求的處理
- 服務發現
- Eureka 自我保護機制
服務注冊的入口 :
服務注冊是在spring boot應用啟動的時候發起的。我們說spring cloud是一個生態,它提供了一套標准,這套標准可以通過不同的組件來實現,其中就包含服務注冊/發現、熔斷、負載均衡等,在spring-cloud-common這個包中,org.springframework.cloud.client.serviceregistry 路徑下,可以看到一個服務注冊的接口定義 ServiceRegistry 。它就是定義了spring cloud中服務注冊的一個接口。這個接口有一個實現 EurekaServiceRegistry 。表示采用的是Eureka Server作為服務注冊中心。由於我本地項目里引入了 Consul Nacos,所以 可以看到多個實現:
服務注冊的發起,我們可以猜測一下應該是什么時候完成?大家想想其實應該不難猜測到,服務的注冊取決於服務是否已經啟動好了。而在spring boot中,會等到spring 容器啟動並且所有的配置都完成之后來進行注冊。而這個動作在spring boot的啟動方法中的refreshContext中完成。
而 refreshContext 一定會走到 AbstractApplicationContext#finishRefresh 方法里,具體的流程大家可以自己跟一下源碼,從 SpringApplication.run() 開始。
我們觀察一下finishRefresh這個方法,從名字上可以看到它是用來體現完成刷新的操作,也就是刷新完成之后要做的后置的操作。它主要做幾個事情
protected void finishRefresh() { // Clear context-level resource caches (such as ASM metadata from scanning). //清空緩存 clearResourceCaches(); // Initialize lifecycle processor for this context. //初始化一個LifecycleProcessor,在Spring啟動的時候啟動bean,在spring結束的時候銷毀bean initLifecycleProcessor(); // Propagate refresh to lifecycle processor first. //調用LifecycleProcessor的onRefresh方法,啟動實現了Lifecycle接口的bean getLifecycleProcessor().onRefresh(); // Publish the final event. //發布ContextRefreshedEvent publishEvent(new ContextRefreshedEvent(this)); // Participate in LiveBeansView MBean, if active. //注冊MBean,通過JMX進行監控和管理 LiveBeansView.registerApplicationContext(this); }
在這個方法中,我們重點關注 getLifecycleProcessor().onRefresh() ,它是調用生命周期處理器的onrefresh方法,找到SmartLifecycle接口的所有實現類並調用start方法。
后續的調用鏈路為:DefaultLifecycleProcessor.startBean -> start() -> doStart() -> bean.start()
SmartLifeCycle是一個接口,當Spring容器加載完所有的Bean並且初始化之后,會繼續回調實現了SmartLifeCycle接口的類中對應的方法,比如(start)。
在 Eureka 的啟動過程中,正是利用這個機制,而對應的類就是 EurekaAutoServiceRegistration ,通過名字我們也能知道,這正是服務注冊自動配置類。他也正巧實現了 SmartLifeCycle。
服務的注冊 :
經過前面的 SmartLifeCycle 知識點的了解。此時,bean.start(),調用的可能是EurekaAutoServiceRegistration中的start方法,因為很顯然,它實現了SmartLifeCycle接口。
@Override public void start() { // only set the port if the nonSecurePort or securePort is 0 and this.port != 0 if (this.port.get() != 0) { if (this.registration.getNonSecurePort() == 0) { this.registration.setNonSecurePort(this.port.get()); } if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) { this.registration.setSecurePort(this.port.get()); } } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get() && this.registration.getNonSecurePort() > 0) { this.serviceRegistry.register(this.registration); this.context.publishEvent(new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig())); this.running.set(true); } }
在start方法中,我們可以看到 this.serviceRegistry.register 這個方法,它實際上就是發起服務注冊的機制。此時this.serviceRegistry的實例,應該是 EurekaServiceRegistry , 原因是EurekaAutoServiceRegistration的構造方法中,會有一個賦值操作,而這個構造方法是在 EurekaClientAutoConfiguration 這個自動裝配類中被裝配和初始化的。(注:如果找不到一個類的初始化,可以利用 IDEA的 find usage 進行查找)
this.serviceRegistry.register(this.registration); 方法最終會調用EurekaServiceRegistry 類中的 register 方法來實現服務注冊。
@Override public void register(EurekaRegistration reg) { maybeInitializeClient(reg); if (log.isInfoEnabled()) { log.info("Registering application " + reg.getApplicationInfoManager().getInfo().getAppName() + " with eureka with status " + reg.getInstanceConfig().getInitialStatus()); } //設置當前實例的狀態,一旦這個實例的狀態發生變化,只要狀態不是DOWN,那么就會被監聽器監聽並且執行服務注冊。 reg.getApplicationInfoManager() .setInstanceStatus(reg.getInstanceConfig().getInitialStatus()); //設置健康檢查的處理 reg.getHealthCheckHandler().ifAvailable(healthCheckHandler -> reg .getEurekaClient().registerHealthCheck(healthCheckHandler)); }
從上述代碼來看,注冊方法中並沒有真正調用Eureka的方法去執行注冊,而是僅僅設置了一個狀態以及設置健康檢查處理器。
我想找到EurekaServiceRegistry.register方法中的 reg.getApplicationInfoManager 這個實例是什么,而且我們發現ApplicationInfoManager是來自於EurekaRegistration這個類中的屬性。而EurekaRegistration又是在EurekaAutoServiceRegistration這個類中實例化的。那我在想,是不是在自動裝配中做了什么東西。於是找到EurekaClientAutoConfiguration這個類,果然看到了Bean的一些自動裝配,其中包含 EurekaClient 、 ApplicationInfoMangager 、 EurekaRegistration 等。
@Configuration(proxyBeanMethods = false) @ConditionalOnMissingRefreshScope protected static class EurekaClientConfiguration { @Autowired private ApplicationContext context; @Autowired private AbstractDiscoveryClientOptionalArgs<?> optionalArgs; @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); } @Bean @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT) public ApplicationInfoManager eurekaApplicationInfoManager( EurekaInstanceConfig config) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty( value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, @Autowired( required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) { return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager) .with(eurekaClient).with(healthCheckHandler).build(); } }
不難發現,我們似乎看到了一個很重要的Bean在啟動的時候做了自動裝配,也就是CloudEurekaClient 。從名字上來看,我可以很容易的識別並猜測出它是Eureka客戶端的一個工具類,用來實現和服務端的通信以及處理。這個很多源碼一貫的套路,要么在構造方法里面去做很多的初始化和一些后台執行的程序操作,要么就是通過異步事件的方式來處理。
接着,我們看一下CloudEurekaClient的初始化過程,它的構造方法中會通過 super 調用父類的構造方法。也就是DiscoveryClient的構造。我們可以看到在最終的DiscoveryClient改造方法中,有非常長的代碼。其實很多代碼可以不需要關心,大部分都是一些初始化工作,比如初始化了幾個定時任務
-
heartbeatExecutor:心跳定時任務
-
cacheRefreshExecutor:定時去同步服務端的實例列表
-
initScheduledTasks()
-
建立心跳檢測機制
-
定時上報服務狀態
-
通過內部類實例化StatusChangeListener實力狀態監控接口
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { // .......構建本服務實例信息 等等操作 // 是否要從eureka server上獲取服務地址信息 if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } //是否要注冊到eureka server上 if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } logger.info("Initializing Eureka in region {}", clientConfig.getRegion()); //如果不需要注冊、也不需要拉取服務地址,就到這里結束 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh // 開啟任務線程池,分別給下面兩個 scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } //如果需要注冊到Eureka server並且是開啟了初始化的時候強制注冊,則調用register()發起服務注冊,默認是不開啟的 if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) { logger.error("Registration error at startup: {}", th.getMessage()); throw new IllegalStateException(th); } } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); }
根據上面代碼的分析,最后會走 initScheduledTasks 去啟動一個定時任務。
-
如果配置了開啟從注冊中心刷新服務列表,則會開啟cacheRefreshExecutor這個定時任務
-
如果開啟了服務注冊到Eureka,則通過需要做幾個事情.
-
建立心跳檢測機制
-
通過內部類來實例化StatusChangeListener 實例狀態監控接口,這個就是前面我們在分析啟動過程中所看到的 reg.getApplicationInfoManager().setInstanceStatus,調用notify的方法,實際上會在這里體現。
private void initScheduledTasks() { //如果配置了開啟從注冊中心刷新服務列表,則會開啟cacheRefreshExecutor這個定時任務 if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } //如果開啟了服務注冊到Eureka,則通過需要做幾個事情 if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer 開啟心跳檢測 heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator 實例同步 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; //注冊實例狀態變化的監聽 if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //啟動一個實例信息復制器,主要就是為了開啟一個定時線程,每40秒判斷實例信息是否變更,如果變更了則重新注冊 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
instanceInfoReplicator.onDemandUpdate() :這個方法的主要作用是根據實例數據是否發生變化,來觸發服務注冊中心的數據。
public boolean onDemandUpdate() { //限流判斷 if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) { if (!scheduler.isShutdown()) { //提交一個任務 scheduler.submit(new Runnable() { @Override public void run() { logger.debug("Executing on-demand update of local InstanceInfo"); //取出之前已經提交的任務,也就是在start方法中提交的更新任務,如果任務還沒有執行完成,則取消之前的任務。 Future latestPeriodic = scheduledPeriodicRef.get(); if (latestPeriodic != null && !latestPeriodic.isDone()) { logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update"); latestPeriodic.cancel(false);//如果此任務未完成,就立即取消 } //通過調用run方法,令任務在延時后執行,相當於周期性任務中的一次 InstanceInfoReplicator.this.run(); } }); return true; } else { logger.warn("Ignoring onDemand update due to stopped scheduler"); return false; } } else { logger.warn("Ignoring onDemand update due to rate limiter"); return false; } }
然后進入 InstanceInfoReplicator#run :
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
最終,我們終於找到服務注冊的入口了,DiscoveryClient.register
boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
到了這里我們需要知道的是 eurekaTransport.registrationClient 這個對象到底是什么 ?我們需要先知道 eurekaTransport.transportClientFactory 是什么
在 DiscoveryClient 構造方法內 調用了一個 scheduleServerEndpointTask 方法,在改方法里初始化了 eurekaTransport
private void scheduleServerEndpointTask(EurekaTransport eurekaTransport, AbstractDiscoveryClientOptionalArgs args) { // ....... // Ignore the raw types warnings since the client filter interface changed between jersey 1/2 @SuppressWarnings("rawtypes") TransportClientFactories transportClientFactories = argsTransportClientFactories == null ? new Jersey1TransportClientFactories() : argsTransportClientFactories; Optional<SSLContext> sslContext = args == null ? Optional.empty() : args.getSSLContext(); Optional<HostnameVerifier> hostnameVerifier = args == null ? Optional.empty() : args.getHostnameVerifier(); // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity eurekaTransport.transportClientFactory = providedJerseyClient == null ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo(), sslContext, hostnameVerifier) : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient); // ......... }
可以發現 transportClientFactories 剛剛進來會是 null,然后構建了一個 Jersey1TransportClientFactories ,繼而通過 Jersey1TransportClientFactories.newTransportClientFactory :
@Override public TransportClientFactory newTransportClientFactory(EurekaClientConfig clientConfig, Collection<ClientFilter> additionalFilters, InstanceInfo myInstanceInfo, Optional<SSLContext> sslContext, Optional<HostnameVerifier> hostnameVerifier) { final TransportClientFactory jerseyFactory = JerseyEurekaHttpClientFactory.create( clientConfig, additionalFilters, myInstanceInfo, new EurekaClientIdentity(myInstanceInfo.getIPAddr()), sslContext, hostnameVerifier ); final TransportClientFactory metricsFactory = MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory); return new TransportClientFactory() { @Override public EurekaHttpClient newClient(EurekaEndpoint serviceUrl) { return metricsFactory.newClient(serviceUrl); } @Override public void shutdown() { metricsFactory.shutdown(); jerseyFactory.shutdown(); } }; }
然后這里調用了 MetricsCollectingEurekaHttpClient.createFactory(jerseyFactory)
public static TransportClientFactory createFactory(final TransportClientFactory delegateFactory) { final Map<RequestType, EurekaHttpClientRequestMetrics> metricsByRequestType = initializeMetrics(); final ExceptionsMetric exceptionMetrics = new ExceptionsMetric(EurekaClientNames.METRIC_TRANSPORT_PREFIX + "exceptions"); return new TransportClientFactory() { @Override public EurekaHttpClient newClient(EurekaEndpoint endpoint) { return new MetricsCollectingEurekaHttpClient( delegateFactory.newClient(endpoint), //請牢記,這個delegateFactory 是上面傳進來的 JerseyEurekaHttpClientFactory metricsByRequestType, exceptionMetrics, false ); } @Override public void shutdown() { shutdownMetrics(metricsByRequestType); exceptionMetrics.shutdown(); } }; }
所以我們知道了 eurekaTransport.transportClientFactory 就是下圖的 代碼,匿名對象,而metricsFactory就是上面這段代碼的匿名內部類
new TransportClientFactory() { @Override public EurekaHttpClient newClient(EurekaEndpoint serviceUrl) { return metricsFactory.newClient(serviceUrl); } @Override public void shutdown() { metricsFactory.shutdown(); jerseyFactory.shutdown(); } };
然后我們回到 scheduleServerEndpointTask 方法的另外一段代碼 :
if (clientConfig.shouldRegisterWithEureka()) { EurekaHttpClientFactory newRegistrationClientFactory = null; EurekaHttpClient newRegistrationClient = null; try { newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory( eurekaTransport.bootstrapResolver, eurekaTransport.transportClientFactory, transportConfig ); newRegistrationClient = newRegistrationClientFactory.newClient(); } catch (Exception e) { logger.warn("Transport initialization failure", e); } eurekaTransport.registrationClientFactory = newRegistrationClientFactory; eurekaTransport.registrationClient = newRegistrationClient; }
EurekaHttpClients.registrationClientFactory 構造出來的對象是也是跟上面一樣的匿名內部類,然后 newRegistrationClientFactory.newClient() 得到一個 SessionedEurekaHttpClient.
回到 DiscoveryClient#register 我們發現並沒有 SessionedEurekaHttpClient,這個時候需要看一下其類圖如下
我們發現其父類 EurekaHttpClientDecorator然后我們跟進去看一下:
@Override public EurekaHttpResponse<Void> register(final InstanceInfo info) { return execute(new RequestExecutor<Void>() { @Override public EurekaHttpResponse<Void> execute(EurekaHttpClient delegate) { return delegate.register(info); } @Override public RequestType getRequestType() { return RequestType.Register; } }); }
最后會在這里經歷過多次的循環 依次是 SessionedEurekaHttpClient ---> RetryableEurekaHttpClient ---> RedirectingEurekaHttpClient ---> MetricsCollectingEurekaHttpClient 最終走到 AbstractJerseyEurekaHttpClient#register
這里還是需要大家自己多跟代碼,卡住了就斷點跟:
@Override public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
很顯然,這里是發起了一次http請求,訪問Eureka-Server的apps/${APP_NAME}接口,將當前服務實例的信息發送到Eureka Server進行保存。至此,我們基本上已經知道Spring Cloud Eureka 是如何在啟動的時候把服務信息注冊到Eureka Server上的了
至此,我們知道Eureka Client發起服務注冊時,有兩個地方會執行服務注冊的任務
- 在Spring Boot啟動時,由於自動裝配機制將CloudEurekaClient注入到了容器,並且執行了構造方法,而在構造方法中有一個定時任務每40s會執行一次判斷,判斷實例信息是否發生了變化,如果是則會發起服務注冊的流程(InstanceInfoReplicator#start)
- 在Spring Boot啟動時,通過refresh方法,最終調用StatusChangeListener.notify進行服務狀態變更的監聽,而這個監聽的方法受到事件之后會去執行服務注冊。(InstanceInfoReplicator#onDemandUpdate)
服務注入的注冊入口流程以及服務注冊的發起流程圖如下:
Eureka Server收到請求之后的處理:
我們一定知道它肯定對請求過來的服務實例數據進行了存儲。那么我們去Eureka Server端看一下處理流程。請求入口在: com.netflix.eureka.resources.ApplicationResource.addInstance() 。
大家可以發現,這里所提供的REST服務,采用的是jersey來實現的。Jersey是基於JAX-RS標准,提供REST的實現的支持,有興趣的可以自己去研究一下。
當EurekaClient調用register方法發起注冊時,會調用ApplicationResource.addInstance方法。服務注冊就是發送一個 POST 請求帶上當前實例信息到類 ApplicationResource 的 addInstance方法進行服務注冊。
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // ........省略代碼 registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
我們發現 registry.register 有多個實現 ,所以我們來看一下類圖
從類關系圖可以看出,PeerAwareInstanceRegistry的最頂層接口為LeaseManager與LookupService,其中LookupService定義了最基本的發現示例的行為,LeaseManager定義了處理客戶端注冊,續約,注銷等操作。
在 addInstance 方法中,最終調用的是 PeerAwareInstanceRegistryImpl.register 方法。
- leaseDuration 表示租約過期時間,默認是90s,也就是當服務端超過90s沒有收到客戶端的心跳,則主動剔除該節點
- 調用super.register發起節點注冊
- 將信息復制到Eureka Server集群中的其他機器上,同步的實現也很簡單,就是獲得集群中的所有節點,然后逐個發起注冊
@Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs();//如果客戶端有自己定義心跳超時時間,則采用客戶端的時間 } //節點注冊 super.register(info, leaseDuration, isReplication); //復制到Eureka Server集群中的其他節點 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
AbstractInstanceRegistry#register :簡單來說,Eureka-Server的服務注冊,實際上是將客戶端傳遞過來的實例數據保存到Eureka-Server中的ConcurrentHashMap中。
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); //從registry中獲得當前實例信息,根據appName Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication);//增加注冊次數到監控信息中 if (gMap == null) {//如果當前appName是第一次注冊,則初始化一個 ConcurrentHashMap final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } //從gMap中查詢已經存在的Lease信息,Lease中文翻譯為租約,實際上它把服務提供者的實例信息包裝成了一個lease,里面提供了對於改服務實例的租約管理 Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease // 當instance已經存在是,和客戶端的instance的信息做比較,時間最新的那個,為有效instance信息 if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) {//當lease不存在時,進入到這段代碼, if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } //構建一個lease Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { // 當原來存在Lease的信息時,設置serviceUpTimestamp, 保證服務啟動的時間一直是第一次注冊的那個 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); recentRegisteredQueue.add(new Pair<Long, String>(//添加到最近注冊的隊列中 System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); // This is where the initial state transfer of overridden status happens // 檢查實例狀態是否發生變化,如果是並且存在,則覆蓋原來的狀態 if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // 得到instanceStatus,判斷是否是UP狀態, // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } // 設置注冊類型為添加 registrant.setActionType(ActionType.ADDED); // 租約變更記錄隊列,記錄了實例的每次變化, 用於注冊信息的增量獲取 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); //讓緩存失效 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }
至此,我們就把服務注冊在客戶端和服務端的處理過程做了一個詳細的分析,實際上在Eureka Server端,會把客戶端的地址信息保存到ConcurrentHashMap中存儲。並且服務提供者和注冊中心之間,會建立一個心跳檢測機制。用於監控服務提供者的健康狀態。
Eureka Server 節點的數據同步:
服務注冊處理完以后 Eureka Server 就要向各個節點進行數據同步,由於他並沒有leader這么一說,我們來看一下是怎么完成的:
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } // Eureka Server 可以拿到其他節點的信息,是我們配置好的,然后遍歷 for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; //是自己 跳過 } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
說白了,同步服務信息,就是 遍歷注冊.
Eureka Server節點注冊存儲(多級緩存設計):
Eureka Server存在三個變量:(registry、readWriteCacheMap、readOnlyCacheMap)保存服務注冊信息,默認情況下定時任務每30s將readWriteCacheMap同步至readOnlyCacheMap,每60s清理超過90s未續約的節點,Eureka Client每30s從readOnlyCacheMap更新服務注冊信息,而客戶端服務的注冊則從registry更新服務注冊信息。從而避免並發帶來的相關問題,也實現了基於內存得讀寫分離機制.
其中 registry 是存儲着注冊上來得所有實例信息,存放位置在 AbstractInstanceRegistry 類, 如下代碼:
public abstract class AbstractInstanceRegistry implements InstanceRegistry { //注冊實例得緩存 private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry }
readWriteCacheMap、readOnlyCacheMap 讀寫分離的內存存儲信息位於 ResponseCacheImpl 類:
public class ResponseCacheImpl implements ResponseCache { //讀緩存 private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); // 寫緩存 private final LoadingCache<Key, Value> readWriteCacheMap; // ....... }
多級緩存的意義:
這里為什么要設計多級緩存呢?原因很簡單,就是當存在大規模的服務注冊和更新時,如果只是修改一個ConcurrentHashMap數據,那么勢必因為鎖的存在導致競爭,影響性能。而Eureka又是AP模型,只需要滿足最終可用就行。所以它在這里用到多級緩存來實現讀寫分離。注冊方法寫的時候直接寫內存注冊表,寫完表之后主動失效讀寫緩存。獲取注冊信息接口先從只讀緩存取,只讀緩存沒有再去讀寫緩存取,讀寫緩存沒有再去內存注冊表里取(不只是取,此處較復雜)。並且,讀寫緩存會更新回寫只讀緩存
- responseCacheUpdateIntervalMs : readOnlyCacheMap 緩存更新的定時器時間間隔,默認為30秒
- responseCacheAutoExpirationInSeconds : readWriteCacheMap 緩存過期時間,默認為 180 秒。
服務注冊的緩存失效:
在AbstractInstanceRegistry.register方法的最后,會調用invalidateCache(registrant.getAppName(), registrant.getVIPAddress(),registrant.getSecureVipAddress()); 方法,使得讀寫緩存失效
定時同步緩存 :
ResponseCacheImpl的構造方法中,會啟動一個定時任務,這個任務會定時檢查寫緩存中的數據變化,進行更新和同步。
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { // ....... // this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } }); if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } // ....... }
最后基於服務端的注冊請求處理附上處理流程圖:
服務續約:
所謂的服務續約,其實就是一種心跳檢查機制。客戶端會定期發送心跳來續約。在客戶端初始化的時候 ,在DiscoveryClient構造方法內,在 initScheduledTasks 中,創建一個心跳檢測的定時任務:
// Heartbeat timer heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, // 30 TimeUnit.SECONDS, expBackOffBound, // 10 new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, // 30 TimeUnit.SECONDS);
HeartbeatThread:然后這個定時任務中,會執行一個 HearbeatThread 的線程,這個線程會定時調用renew()來做續約。
/** * The heartbeat task that renews the lease in the given intervals. */ private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } } @Override public EurekaHttpResponse<InstanceInfo> sendHeartBeat(String appName, String id, InstanceInfo info, InstanceStatus overriddenStatus) { String urlPath = "apps/" + appName + '/' + id; ClientResponse response = null; try { WebResource webResource = jerseyClient.getClient().resource(serviceUrl) .path(urlPath) .queryParam("status", info.getStatus().toString()) .queryParam("lastDirtyTimestamp", info.getLastDirtyTimestamp().toString()); if (overriddenStatus != null) { webResource = webResource.queryParam("overriddenstatus", overriddenStatus.name()); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).put(ClientResponse.class); InstanceInfo infoFromPeer = null; if (response.getStatus() == Status.CONFLICT.getStatusCode() && response.hasEntity()) { infoFromPeer = response.getEntity(InstanceInfo.class); } return anEurekaHttpResponse(response.getStatus(), infoFromPeer).type(MediaType.APPLICATION_JSON_TYPE).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("[heartbeat] Jersey HTTP PUT {}; statusCode={}", urlPath, response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
服務端收到心跳請求的處理:
上面代碼里很清楚,會發送一個請求到"apps/" + appName + '/' + id 的路徑的接口上。在ApplicationResource.getInstanceInfo這個接口中,會返回一個InstanceResource的實例,在該實例下,定義了一個statusUpdate的接口來更新狀態
@Path("{id}") public InstanceResource getInstanceInfo(@PathParam("id") String id) { return new InstanceResource(this, id, serverConfig, registry); }
InstanceResource.statusUpdate():在該方法中,我們重點關注 registry.statusUpdate 這個方法,它會調用AbstractInstanceRegistry.statusUpdate來更新指定服務提供者在服務端存儲的信息中的變化。
@PUT @Path("status") public Response statusUpdate( @QueryParam("value") String newStatus, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { try {// 通過服務名稱獲取服務集合 if (registry.getInstanceByAppAndId(app.getName(), id) == null) { logger.warn("Instance not found: {}/{}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); }//修改狀態 boolean isSuccess = registry.statusUpdate(app.getName(), id, InstanceStatus.valueOf(newStatus), lastDirtyTimestamp, "true".equals(isReplication)); if (isSuccess) { logger.info("Status updated: {} - {} - {}", app.getName(), id, newStatus); return Response.ok().build(); } else { logger.warn("Unable to update status: {} - {} - {}", app.getName(), id, newStatus); return Response.serverError().build(); } } catch (Throwable e) { logger.error("Error updating instance {} for status {}", id, newStatus); return Response.serverError().build(); } }
AbstractInstanceRegistry.statusUpdate :在這個方法中,會拿到應用對應的實例列表,然后調用Lease.renew()去進行心跳續約。
@Override public boolean statusUpdate(String appName, String id, InstanceStatus newStatus, String lastDirtyTimestamp, boolean isReplication) { try { read.lock();
// 更新狀態的次數 狀態統計 STATUS_UPDATE.increment(isReplication);
// 從本地數據里面獲取實例信息, Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> lease = null; if (gMap != null) { lease = gMap.get(id); }
// 實例不存在,則直接返回,表示失敗 if (lease == null) { return false; } else {
// 執行一下lease的renew方法,里面主要是更新了這個instance的最后更新時間。 lease.renew();
// 獲取instance實例信息 InstanceInfo info = lease.getHolder(); // Lease is always created with its instance info object. // This log statement is provided as a safeguard, in case this invariant is violated. if (info == null) { logger.error("Found Lease without a holder for instance id {}", id); }
// 當instance信息不為空時,並且實例狀態發生了變化 if ((info != null) && !(info.getStatus().equals(newStatus))) { // Mark service as UP if needed if (InstanceStatus.UP.equals(newStatus)) {
// 如果新狀態是UP的狀態,那么啟動一下serviceUp() , 主要是更新服務的注冊時間 lease.serviceUp(); } // This is NAC overriden status
// 將instance Id 和這個狀態的映射信息放入覆蓋緩存MAP里面去
overriddenInstanceStatusMap.put(id, newStatus); // Set it for transfer of overridden status to replica on // replica start up
// 設置覆蓋狀態到實例信息里面去
info.setOverriddenStatus(newStatus); long replicaDirtyTimestamp = 0; info.setStatusWithoutDirty(newStatus); if (lastDirtyTimestamp != null) { replicaDirtyTimestamp = Long.valueOf(lastDirtyTimestamp); } // If the replication's dirty timestamp is more than the existing one, just update // it to the replica's.// 如果replicaDirtyTimestamp 的時間大於instance的getLastDirtyTimestamp() ,則更新 if (replicaDirtyTimestamp > info.getLastDirtyTimestamp()) { info.setLastDirtyTimestamp(replicaDirtyTimestamp); } info.setActionType(ActionType.MODIFIED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); info.setLastUpdatedTimestamp();
//更新寫緩存 invalidateCache(appName, info.getVIPAddress(), info.getSecureVipAddress()); } return true; } } finally { read.unlock(); } }
至此,心跳續約功能就分析完成了。流程圖如下:
服務發現:
我們繼續來研究服務的發現過程,就是客戶端需要能夠滿足兩個功能
- 在啟動的時候獲取指定服務提供者的地址列表
- Eureka server端地址發生變化時,需要動態感知
DiscoveryClient構造時進行查詢:構造方法中,如果當前的客戶端默認開啟了fetchRegistry,則會從eureka-server中拉取數據。
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { // ........ if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // ...... }
DiscoveryClient.fetchRegistry:
private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // If the delta is disabled or if it is the first time, get all // applications 從本地緩存 localRegionApps.get() 服務實例信息 Applications applications = getApplications(); //1. 是否禁用增量更新; //2. 是否對某個region特別關注; //3. 外部調用時是否通過入參指定全量更新; //4. 本地還未緩存有效的服務列表信息 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch || (applications == null) //如果服務列表為空 || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta()); logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress()); logger.info("Force full registry fetch : {}", forceFullRegistryFetch); logger.info("Application is null : {}", (applications == null)); logger.info("Registered Applications size is zero : {}", (applications.getRegisteredApplications().size() == 0)); logger.info("Application version is -1: {}", (applications.getVersion() == -1)); getAndStoreFullRegistry(); //全量拉取 } else { getAndUpdateDelta(applications);//增量拉取 } applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e); return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status onCacheRefreshed();//刷新環迅 // Update remote status based on refreshed data held in the cache updateInstanceRemoteStatus();//更新服務實例狀態 // registry was fetched successfully, so return true return true; }
再跟進去就能看到對應的接口,對應的就是 ApplicationResource 跟 ApplicationsResource 兩個類里面開放的接口。我們下面會講到。
定時任務每隔30s更新一次本地地址列表,在DiscoveryClient構造的時候,會初始化一些任務,這個在前面咱們分析過了。其中有一個任務動態更新本地服務地址列表,叫 cacheRefreshTask 。這個任務最終執行的是CacheRefreshThread這個線程。它是一個周期性執行的任務,具體我們來看一下。
private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds,//30 TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds,// 30 TimeUnit.SECONDS); } //....... }
TimedSupervisorTask:從整體上看,TimedSupervisorTask是固定間隔的周期性任務,一旦遇到超時就會將下一個周期的間隔時間調大,如果連續超時,那么每次間隔時間都會增大一倍,一直到達外部參數設定的上限為止,一旦新任務不再超時,間隔時間又會自動恢復為初始值。這種設計還是值得學習的。
@Override public void run() { Future<?> future = null; try { //使用Future,可以設定子線程的超時時間,這樣當前線程就不用無限等待了 future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); //指定等待子線程的最長時間 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout delay.set(timeoutMillis);//delay是個很有用的變量,后面會用到,這里記得每次執行任務成功都會將delay重置 threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); timeoutCounter.increment(); long currentDelay = delay.get(); long newDelay = Math.min(maxDelay, currentDelay * 2); //設置為最新的值,考慮到多線程,所以用了CAS delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { //一旦線程池的阻塞隊列中放滿了待處理任務,觸發了拒絕策略,就會將調度器停掉 if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.warn("task supervisor rejected the task", e); } rejectedCounter.increment(); } catch (Throwable e) {//一旦出現未知的異常,就停掉調度器 if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally { if (future != null) {//這里任務要么執行完畢,要么發生異常,都用cancel方法來清理任務; future.cancel(true); } //只要調度器沒有停止,就再指定等待時間之后在執行一次同樣的任務 if (!scheduler.isShutdown()) { //這里就是周期性任務的原因:只要沒有停止調度器,就再創建一次性任務,執行時間時dealy的值, //假設外部調用時傳入的超時時間為30秒(構造方法的入參timeout),最大間隔時間為50秒(構造方法的入參expBackOffBound) //如果最近一次任務沒有超時,那么就在30秒后開始新任務, //如果最近一次任務超時了,那么就在50秒后開始新任務(異常處理中有個乘以二的操作,乘以二后的60秒超過了最大間隔50秒) scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } }
executor.submit(task) 提交的 CacheRefreshThread 會走到 CacheRefreshThread.refreshRegistry 這段代碼主要兩個邏輯
- 判斷remoteRegions是否發生了變化
- 調用fetchRegistry獲取本地服務地址緩存
@VisibleForTesting void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored.
//如果部署在aws環境上,會判斷最后一次遠程區域更新的信息和當前遠程區域信息進行比較,如果不想等,則更新
String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) {//判斷最后一次 // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { logger.info("Remote regions to fetch modified concurrently," + " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { // Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } // 拉取服務地址 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } }
不管是啟動的時候拉取服務列表,還是定時任務獲取服務列表,都會通過 com.netflix.discovery.DiscoveryClient#fetchRegistry,從上面的分析 我們知道了 客戶端發起服務地址的查詢有兩種,一種是全量、另一種是增量。從eureka server端獲取服務注冊中心的地址信息,然后更新並設置到本地緩存 localRegionApps 。
- 對於全量查詢請求,會調用Eureka-server的ApplicationsResource的getContainers方法。(apps/ GET請求)
- 而對於增量請求,會調用ApplicationsResource.getContainerDifferential。(apps/delta GET請求)
ApplicationsResource.getContainers:接收客戶端發送的獲取全量注冊信息請求
@GET public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); } // Check if the server allows the access to the registry. The server can // restrict access if it is not // ready to serve traffic depending on various reasons.
// EurekaServer無法提供服務,返回403
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { return Response.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON;// 設置返回數據格式,默認JSON String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML;// 如果接收到的請求頭部沒有具體格式信息,則返回格式為XML returnMediaType = MediaType.APPLICATION_XML; } // 構建緩存鍵 Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); // 返回不同的編碼類型的數據,去緩存中取數據的方法基本一致 Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { //這里就是上面提到的 服務端緩存區獲取服務 response = Response.ok(responseCache.get(cacheKey)) .build(); } CurrentRequestVersion.remove(); return response; }
然后將服務信息返回給客戶端,結束流程,流程圖如下:
Eureka 自我保護機制:
了解了Eureka 的服務注冊發現相關源碼后,我們回過投來看看Eureka 的自我保護機制是怎么實現的?
在Eureka的自我保護機制中,有兩個很重要的變量,Eureka的自我保護機制,都是圍繞這兩個變量來實現的,在AbstractInstanceRegistry這個類中定義的
protected volatile int numberOfRenewsPerMinThreshold; //每分鍾最小續約數量 protected volatile int expectedNumberOfClientsSendingRenews; //預期每分鍾收到續約的客戶端數量,取決於注冊到eureka server上的服務數量
numberOfRenewsPerMinThreshold 表示每分鍾的最小續約數量,它表示什么意思呢?就是EurekaServer期望每分鍾收到客戶端實例續約的總數的閾值。如果小於這個閾值,就會觸發自我保護機制。它是在以下代碼中賦值的,
protected void updateRenewsPerMinThreshold() { this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()); } //自我保護閥值 = 服務總數 * 每分鍾續約數(60S/客戶端續約間隔) * 自我保護續約百分比閥值因子默認 0.85
- getExpectedClientRenewalIntervalSeconds,客戶端的續約間隔,默認為30s
- getRenewalPercentThreshold,自我保護續約百分比閾值因子,默認0.85。 也就是說每分鍾的續約數量要大於85%
這兩個變量的更新:需要注意的是,這兩個變量是動態更新的,有四個地方來更新這兩個值
1.Eureka-Server的初始化:在EurekaBootstrap這個類中,有一個 initEurekaServerContext 方法
protected void initEurekaServerContext() throws Exception { EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig(); // ...... registry.openForTraffic(applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); } public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. this.expectedNumberOfClientsSendingRenews = count;//初始化 updateRenewsPerMinThreshold();//更新numberOfRenewsPerMinThreshold logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); super.postInit(); }
2.PeerAwareInstanceRegistryImpl.cancel,當服務提供者主動下線時,表示這個時候Eureka-Server要剔除這個服務提供者的地址,同時也代表這這個心跳續約的閾值要發生變化。所以在 PeerAwareInstanceRegistryImpl.cancel 中可以看到數據的更新調用路徑 PeerAwareInstanceRegistryImpl.cancel -> AbstractInstanceRegistry.cancel->internalCancel
protected boolean internalCancel(String appName, String id, boolean isReplication) { try { read.lock(); //....... synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews. this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); } } return true; }
3.PeerAwareInstanceRegistryImpl.register,當有新的服務提供者注冊到eureka-server上時,需要增加續約的客戶端數量,所以在register方法中會進行處理,register ->super.register(AbstractInstanceRegistry)
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { //....... // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } //........ }
4.PeerAwareInstanceRegistryImpl.scheduleRenewalThresholdUpdateTask,15分鍾運行一次,判斷在15分鍾之內心跳失敗比例是否低於85%。在DefaultEurekaServerContext ---> @PostConstruct修飾的initialize()方法 ---> init()
private void updateRenewalThreshold() { try { Applications apps = eurekaClient.getApplications(); int count = 0; for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count; } } } synchronized (lock) { // Update threshold only if the threshold is greater than the // current expected threshold or if self preservation is disabled. if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews) || (!this.isSelfPreservationModeEnabled())) { this.expectedNumberOfClientsSendingRenews = count; updateRenewsPerMinThreshold(); } } logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold); } catch (Throwable e) { logger.error("Cannot update renewal threshold", e); } }
自我保護機制觸發任務:
在AbstractInstanceRegistry的postInit方法中,會開啟一個EvictionTask的任務,這個任務用來檢測是否需要開啟自我保護機制。
protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }
其中,EvictionTask表示最終執行的任務
@Override public void run() { try { long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); / 是否需要開啟自我保護機制,如果需要,那么直接RETURE, 不需要繼續往下執行了 if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } //這下面主要是做服務自動下線的操作的 }
isLeaseExpirationEnabled:
- 是否開啟了自我保護機制,如果沒有,則跳過,默認是開啟
- 計算是否需要開啟自我保護,判斷最后一分鍾收到的續約數量是否大於numberOfRenewsPerMinThreshold
public boolean isLeaseExpirationEnabled() { if (!isSelfPreservationModeEnabled()) { // The self preservation mode is disabled, hence allowing the instances to expire. return true; } return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold; }
一目了然,附上自我保護機制的流程圖:
Eureka 的這幾個流程的源碼還是蠻繞的,只要大家靜下心來,仔細的閱讀幾遍,問題還是不大的。更多的信息請參考官網。