SpringCloud--Eureka--原理及源碼解析


一、Eureka的基礎架構及服務治理機制

  Eureka服務治理的基礎架構包含三個核心:服務注冊中心、服務提供者、服務消費者。其中服務注冊中心,即Eureka提供的服務端,提供服務注冊和發現的功能;服務提供者,即將自己的服務注冊到注冊中心;服務的消費者,從注冊中心獲取服務列表,從而使消費者知道到何處調用服務,服務消費可以使用Ribbon、Feign等。

  1、服務提供者:

    (1)服務注冊:服務提供者在項目啟動時,會通過發送REST請求的方式將自己注冊到eureka server上,同時帶上一些自己的元數據,Eureka Server收到請求后,將元數據存儲在一個雙層map中,第一層的key是服務名稱,第二層的key是具體的服務實例。

    (2)服務同步:如果A項目將服務注冊到了M注冊中心,B項目將服務注冊到N注冊中心,但是如果M項目和N項目開啟了可以注冊為服務的配置,那么當A項目將服務注冊到M注冊中心時,M注冊中心會將請求轉發到N注冊中心,以保證兩個注冊中心副本中服務同步。

    (3)服務續約:在注冊完服務后,服務提供者會維護一個心跳來持續告訴注冊中心其還活着,以防止注冊中心的剔除任務將該服務實例從服務列表中刪除。

    關於心跳頻率與剔除任務認為服務失效時間的配置參數如下所示(配置值均為默認值):

eureka:
  instance:
    # 心跳檢測頻率
    lease-renewal-interval-in-seconds: 30 # 服務失效時間 lease-expiration-duration-in-seconds: 90

  2、服務消費者:

    (1)獲取服務:當啟動服務消費者項目時,會向注冊中心發送一個REST請求來獲取注冊中心上注冊的服務清單。為了性能的考慮,注冊中心自己維護了一份只讀的注冊服務清單,每30秒更新一次,要調整注冊中心中注冊服務清單更新頻率,可以使用如下參數進行設置(下面示例為默認值),同時,由於獲取服務是服務消費的基礎,因此需要保證eureka.client.fetch-registry為true

eureka:
  client:
    registry-fetch-interval-seconds: 30 fetch-registry: true

    (2)服務調用:服務消費者在獲取到服務提供清單后,會根據服務名獲得具體的實例名和該實例的元數據,然后客戶端可以根據自己需要,選擇調用哪個實例,在上述代碼樣例中,我們使用的是Ribbon來做負載均衡,而ribbon默認采用輪詢的方式進行調用,從而實現客戶端的負載。對於訪問實例的選擇,Eureka中有Region和Zone的概念,一個Region中可以包含多個Zone,一個客戶端會被注冊到一個Zone中,所以一個客戶端只對應一個Zone和一個Region,在服務調用時,優先訪問處於同一個Zone中的服務提供者,若訪問不到,再訪問其他Zone中的服務提供者。

    (3)服務下線:當客戶端實例進行正常的關閉操作時,它會觸發一個服務下線的REST請求給注冊中心,告訴注冊中心其要下線,注冊中心收到請求后,將該服務狀態置為下線,並把該事件傳播出去。

  3、服務注冊中心

    (1)失效剔除:有時服務實例並不會正常下線,可能是由於內存溢出、網絡故障等原因使得服務不能正常運行,所以注冊中心並未收到服務下線的請求。為了剔除該類不可用服務提供者實例,Eureka Server在啟動時,會創建一個定時任務,每隔一段時間(默認60秒)將當前清單中超時(默認90秒)沒有續約的服務剔除出去。

    (2)自我保護:前面提到過,服務提供者啟動后,會維護一個心跳,定時向注冊中心發送心跳,告訴注冊中心自己還活着。注冊中心的運行期間,會統計心跳失敗的比例在15分鍾內是否低於85%,如果低於85%,注冊中心會將該服務的實例保護起來,不讓其過期,但是由於在本地測試,所以這個情況非常容易滿足(而線上則主要是由於網絡不穩定等導致),這就導致在保護期間內,如果服務提供者實例出現問題,那么客戶端就會拿到有問題的實例,將會出現調用失敗的情況,因此客戶端必須要有容錯機制,比如說請求重試、斷路器等機制。如果我們想關閉自我保護機制,可以使用如下參數。

eureka:
  server:
    enable-self-preservation: false

在我們沒有關閉自我保護之前,當我們在之前訪問注冊中心時:http://localhost:1112/,會看到紅色警告(警告內容如下圖所示),這就是觸發了Eureka Server的自我保護機制。

     

二、Eureka源碼分析

  1、服務注冊中心的加載

  首先從服務提供者開始看,例如eureka-client項目,我們主要是在主函數上添加了@EnableDiscoveryClient注解,以及在配置文件中添加了注冊中心地址等配置信息,那么源碼入口就可以從@EnableDiscoveryClient注解開始看,從該注解我們可以了解,主要是開啟DiscoveryClient,那么全局搜索DiscoveryClient,可以發現有兩個,一個是springCloud提供的接口org.springframework.cloud.client.discovery.DiscoveryClient,另外一個是netflix的實現com.netflix.discovery.DiscoveryClient,可以看下類的依賴關系,SpringCloud提供的接口類依賴關系及 netflix的實現類的類依賴關系分別如以下左右兩張圖:

 

 

 而org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient又使用了com.netflix.discovery.EurekaClient,所以,總的依賴關系如下:

org.springframework.cloud.client.discovery.DiscoveryClient提供了Springcloud服務注冊相關的接口,而org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是netflix公司對於該接口的實現,而該實現,是包裝了netflix公司開源項目中的com.netflix.discovery.EurekaClient接口及實現com.netflix.discovery.DiscoveryClient。

  可以看調用鏈,先調用了com.netflix.discovery.DiscoveryClient#getEurekaServiceUrlsFromConfig方法

    /**
     * @deprecated use {@link #getServiceUrlsFromConfig(String, boolean)} instead.
     */
    @Deprecated
    public static List<String> getEurekaServiceUrlsFromConfig(String instanceZone, boolean preferSameZone) { return EndpointUtils.getServiceUrlsFromConfig(staticClientConfig, instanceZone, preferSameZone); }

  可以看到該方法已經過期,被使用@link到了替代方法com.netflix.discovery.DiscoveryClient#getServiceUrlsFromConfig

    @Deprecated
    @Override
    public List<String> getServiceUrlsFromConfig(String instanceZone, boolean preferSameZone) { return EndpointUtils.getServiceUrlsFromConfig(clientConfig, instanceZone, preferSameZone); }

  然后,調用到了com.netflix.discovery.endpoint.EndpointUtils#getServiceUrlsFromConfig方法

    public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
        List<String> orderedUrls = new ArrayList<String>(); String region = getRegion(clientConfig); String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); if (availZones == null || availZones.length == 0) { availZones = new String[1]; availZones[0] = DEFAULT_ZONE; } logger.debug("The availability zone for the given region {} are {}", region, availZones); int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]); if (serviceUrls != null) { orderedUrls.addAll(serviceUrls); } int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1); while (currentOffset != myZoneOffset) { serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]); if (serviceUrls != null) { orderedUrls.addAll(serviceUrls); } if (currentOffset == (availZones.length - 1)) { currentOffset = 0; } else { currentOffset++; } } if (orderedUrls.size() < 1) { throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!"); } return orderedUrls; }

  該方法依次操作:獲取項目對應的Region、獲取項目對應的Zones數組(如果沒有,則取默認Zone)、計算項目對應Zones數組的偏移量、獲取注冊中心、計算當前偏移量、根據當前偏移量獲取注冊中心地址並將地址添加在地址集合中、返回地址集合

  接下來每個步驟單獨說明。

  (1)獲取項目對應的Region

    public static String getRegion(EurekaClientConfig clientConfig) {
        String region = clientConfig.getRegion(); if (region == null) { region = DEFAULT_REGION; } region = region.trim().toLowerCase(); return region; }

  這個沒什么可說的,就是從配置文件中獲取Region的配置,如果沒有,則取默認值,最終將Region轉換成大寫返回,這里可以使用如下參數設置Region

eureka:
  client:
    region: test-1

  (2)獲取項目對應的Zones數組

    public String[] getAvailabilityZones(String region) {
        String value = this.availabilityZones.get(region); if (value == null) { value = DEFAULT_ZONE; } return value.split(","); }

  根據region獲取配置文件中zone數組,如果沒有配置,則取默認值(上面使用的eureka.client.service-url.defaultZone即是默認配置),若要指定Zone,可以使用如下配置:

 

eureka:
  client:
    #client所在zone為availabilityZones的第一個zone,如果未配置,則為defaultZone
    prefer-same-zone-eureka: true region: region1 availability-zones: region1: zone1,zone2,zone3 region2: zone4,zone5,zone6 service-url: zone1: http://localhost:1111/eureka/ zone2: http://localhost:1111/eureka/ zone3: http://localhost:1111/eureka/ zone4: http://localhost:1112/eureka/ zone5: http://localhost:1112/eureka/ zone6: http://localhost:1112/eureka/

 

  (3)獲取注冊中心,計算zone數組下標,獲得zone,然后根據zone獲取注冊中心

    public List<String> getEurekaServerServiceUrls(String myZone) {
        String serviceUrls = this.serviceUrl.get(myZone); if (serviceUrls == null || serviceUrls.isEmpty()) { serviceUrls = this.serviceUrl.get(DEFAULT_ZONE); } if (!StringUtils.isEmpty(serviceUrls)) { final String[] serviceUrlsSplit = StringUtils .commaDelimitedListToStringArray(serviceUrls); List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length); for (String eurekaServiceUrl : serviceUrlsSplit) { if (!endsWithSlash(eurekaServiceUrl)) { eurekaServiceUrl += "/"; } eurekaServiceUrls.add(eurekaServiceUrl.trim()); } return eurekaServiceUrls; } return new ArrayList<>(); }

  獲取zone配置的注冊中心,如果沒有,則取默認的注冊中心,然后使用逗號切割后組裝成集合返回。

  當我們在使用Ribbon來實現服務調用時,對於zone的設置可以實現區域親和性,Ribbon會優先訪問屬於同一Zone中的服務實例,只有當同一zone中沒有可用實例后,才會訪問其他zone中的實例。所以通過zone屬性的定義,配合實際部署的物理結構,我們就可以有效的設計出針對區域性故障的容錯集群。

  2、服務注冊

  在com.netflix.discovery.DiscoveryClient構造函數中調用了com.netflix.discovery.DiscoveryClient#initScheduledTasks方法

    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize  statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }

  對於該方法,依次錯了如下操作:

    判斷是否開啟服務獲取,如果開啟,創建一個定時任務,定時刷新服務列表

    判斷是否開啟服務注冊,如果開啟,添加一個服務續租定時任務;異步注冊服務。

  接下來我們一一查看操作:

  (1)判斷是否開啟服務注冊,如果開啟,添加一個服務續租定時任務;異步注冊服務。

  這個判斷使用的參數是上述的eureka.client.register-with-eureka參數,如果配置為true,創建了一個服務續租的定時任務,還創建了一個異步注冊的任務。

  a、先看異步注冊服務類InstanceInfoReplicator,該類實現了Rnnable接口,因此直接看run方法

    public void run() {
        try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }

這里注冊的,是discoveryClient.register();這一行代碼,查看register方法

    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }

可以看到,這里使用了http請求將元數據InstanceInfo請求到注冊中心。

  b、然后看服務續租定時任務

  該任務的執行類是HeartbeatThread,直接看run方法,潤方法中調用了com.netflix.discovery.DiscoveryClient#renew方法

    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }

  可以發現該方法,也是使用http的請求,將appname等信息發送給注冊中心。

  (2)判斷是否開啟服務獲取,如果開啟,創建一個定時任務,定時刷新服務列表

  這個判斷對應上述使用的參數eureka.client.fetch-registry,如果開啟則創建一個定時任務,該定時任務的執行頻率等都是使用參數配置的(參數內容不再單獨說明,看屬性名稱即可判斷配置參數名稱),然后主要看調用的實現類CacheRefreshThread,該實現類的run方法調用了com.netflix.discovery.DiscoveryClient#refreshRegistry方法:

    @VisibleForTesting
    void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync  synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { logger.info("Remote regions to fetch modified concurrently," + " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { // Just refresh mapping to reflect any DNS/Property change  instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } }

  前面的一大堆都是校驗,后面的一大堆都是輸出,唯一有用的是:boolean success = fetchRegistry(remoteRegionsModified);里面會根據是否是第一發起服務獲取請求做不同的請求處理。

  3、注冊中心

  Eureka server對於各類rest請求的定義都位於com.netflix.eureka.resources包下,以服務注冊方法為例:com.netflix.eureka.resources.ApplicationResource#addInstance

    @POST
    @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }

  上面一通校驗,對業務邏輯有影響的只有registry.register(info, "true".equals(isReplication));最終調用到org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register(com.netflix.appinfo.InstanceInfo, boolean)方法

    public void register(final InstanceInfo info, final boolean isReplication) {
        handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
        super.register(info, isReplication);
    }

  可以看到,先調用了handleRegistration方法將注冊事件通知出去,然后調用了父類的register方法將服務注冊。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM