Spring Cloud進階篇之Eureka原理分析


前言

之前寫了幾篇Spring Cloud的小白教程,相信看過的朋友對Spring Cloud中的一些應用有了簡單的了解,寫小白篇的目的就是為初學者建立一個基本概念,讓初學者在學習的道路上建立一定的基礎。

從今天開始,我會持續更新幾篇Spring Cloud的進階教程。

Eureka簡介

Eureka是Netflix開發的服務發現框架,本身就是一個基於REST的服務。Spring Cloud將它集成在其子項目spring-cloud-netflix中,用來實現服務的注冊與發現功能。

Eureka總體架構圖

Eureka組件介紹

  • 服務注冊中心集群

分別部署在IDC1、IDC2、IDC3中心

  • 服務提供者

服務提供者一個部署在IDC1,一個部署在IDC3

  • 服務消費者

服務消費者一個部署在IDC1,一個部署在IDC2

組件之間的調用關系

服務提供者

  • 啟動服務:服務提供者會向服務注冊中心發起Register請求,注冊服務。
  • 運行過程中:服務提供者會定時向注冊中心發送Renew心跳,告訴它“我還活着”。
  • 停止服務提供:服務提供者會向服務注冊中心發送Cancel請求,告訴它清空當前服務注冊信息。

服務消費者

  • 啟動后:從服務注冊中心拉取服務注冊信息。
  • 運行過程中:定時更新服務注冊信息。
  • 發起遠程調用
  • - 服務消費者會從服務注冊中心選擇同機房的服務提供者,然后發起遠程調用,只有同機房的服務提供者宕機才會去選擇其他機房的服務提供者。
  • 如果服務消費者發現同機房沒有服務提供者,則會按照負載均衡算法 選擇其他機房的服務提供者,然后發起遠程調用。

注冊中心

  • 啟動后:從其他節點拉取服務注冊信息
  • 運行過程中:
  • - 定時運行Evict任務,定時清理沒有按時發送Renew的服務提供者,這里的清理會將非常正常停止、網絡異常等其他因素引起的所有服務。
  • 接收到的Register、Renew、Cancel請求,都會同步到其他的注冊中心節點。

Eureka Server會通過Register、Renew、Get Registry等接口提供服務的注冊、發現和心跳檢測等。

Eureka Client是一個java客戶端,用於簡化與Eureka Server的交互,客戶端本身也內置了負載均衡器(默認使用round-robin方式),在啟動后會向Eureka Server發送心跳檢測,默認周期為30s,Eureka Server如果在多個心跳周期內沒有接收到Eureka client的某一個節點的心跳請求,Eureka Server會從服務注冊中心清理到對應的Eureka Client的服務節點(默認90s)。

數據結構

服務存儲的數據結構可以簡單的理解為是一個兩層的HashMap結構(為了保證線程安全使用的ConcurrentHashMap),具體的我們可以查看源碼中的AbstractInstanceRegistry類:

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();

第一層ConcurrentHashMap的key=spring.application.name,也就是應用名稱,value為ConcurrentHashMap。

第二層ConcurrentHashMap的key=instanceId,也就是服務的唯一實例id,value為Lease對象,也就是具體的服務。Lease其實就是對InstanceInfo的包裝,里面保存了實例信息、服務注冊的時間等。具體的我們可以查看InstanceInfo源碼。

數據存儲過程

Eureka是通過REST接口對外提供服務的。

這里我以注冊為例(ApplicationResource),首先將PeerAwareInstanceRegistry的實例注入到ApplicationResource的成員變量的registry里。

  • ApplicationResource接收到請求后,對調用registry.register()方法。
@POST
    @Consumes({"application/json""application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION)
 String isReplication) 
{
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }
  • AbstractInstanceRegistry在register方法里完成對服務信息的存儲。
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
                // InstanceInfo instead of the server local copy.
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfClientsSendingRenews > 0) {
                        // Since the client wants to register it, increase the number of clients sending renews
                        this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                        updateRenewsPerMinThreshold();
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

從源碼中不難看出存儲的數據結構是雙層的HashMap。

Eureka還實現了二級緩存來保證即將對外傳輸的服務信息,

  • 一級緩存:本質還是HashMap,沒有過期時間,保存服務信息的對外輸出的數據結構。

    private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
  • 二級緩存:是guava的緩存,包含失效機制,保存服務信息的對外輸出的數據結構。

    private final LoadingCache<Key, Value> readWriteCacheMap;
  • 緩存的更新:

  • 刪除二級緩存:

  • - client端發送register、renew、cancel請求並更新register注冊表之后會刪除二級緩存;

  • server端自身的Evict任務剔除服務后會刪除二級緩存;

  • 二級緩存本事設置的失效機制(指的是guava實現的readWriteCacheMap),

  • 加載二級緩存:

  • - client發送Get registry請求后,如果二級緩存中沒有,就會觸發guava的load機制,從registry中獲取原始的服務信息后進行加工處理,然后放入二級緩存中;

  • server端更新一級緩存的時候,如果二級緩存沒有數據也會觸發guava的load機制;

  • 更新一級緩存:

  • - server端內置了一個time task會定時將二級緩存中的數據同步到一級緩存中,這其中包括了刪除和更新。

緩存的機制可以查看ResponseCacheImpl源碼。

Eureka的數據結構簡單總結為:

服務注冊機制

服務注冊中心、服務提供者、服務消費者在啟動后都會向服務注冊中心發起注冊服務的請求(前提是配置了注冊服務)。

注冊中心接到register請求后:

  • 將服務信息保存到registry中;

  • 更新隊列,將該事件添加到更新隊列中,給Eureka client增量同步服務信息使用;

  • 清空二級緩存,用於保證數據的一致性;(即清空的是:readWriteCacheMap

  • 更新閾值;

  • 同步服務信息;

服務續約

服務注冊后,要定時發送續約請求(心跳檢查),證明我還活着,不要清空我的服務信息,定時時間默認30s,可以通過配置:eureka.instance.lease-renewal-interval-in-seconds來修改。

注冊中心接收到續約請求后(renew):

  • 更新服務對象的最近續約時間(lastUpdateTimestamp);
  • 將信息同步給其他的節點;

服務注銷

正常的服務停止之前會發送注銷服務請求,通知注冊中心我要下線了。

注冊中心接收到注銷請求后(cancel):

  • 將服務信息從registry中刪除;
  • 更新隊列;
  • 清空二級緩存;
  • 更新閾值;
  • 同步信息給其他節點;

說明:只有服務正常停止才會發送cancel請求,非正常停止的會通過Eureka Server的主動剔除機制進行刪除。

服務剔除

服務剔除其實是一個兜底的方案,目的就是解決非正常情況下的服務宕機或其他因素導致不能發送cancel請求的服務信息清理的策略。

服務剔除分為:

  • 判斷剔除條件
  • 找出過期服務
  • 清理過期服務

剔除條件:

  • 關閉自我保護
  • 自我保護如果開啟,會先判斷是server還是client出現問題,如果是client的問題就會進行刪除;

自我保護機制:Eureka的自我保護機制是為了防止誤殺服務提供的一種保護機制。Eureka的自我保護機制認為如果有大量的服務都續約失敗,則認為自己出現了問題(例如:自己斷網了),也就不剔除了。反之,則是它人的問題,就進行剔除。

自我保護的閾值分為server和client,如果超出閾值就是表示大量服務可用,部分服務不可用,這判定為client端出現問題。如果未超出閾值就是表示大量服務不可用,則判定是自己出現了問題。

閾值的計算:

  • 自我保護閾值 = 服務總數 * 每分鍾續約數 * 自我保護閾值因子;
  • 每分鍾續約數 = (60s / 客戶端續約時間);

過期服務:

找出過期服務會遍歷所有的服務,判斷上次續約時間距離當前時間大於閾值就標記為過期,同時會將這些過期的服務保存的過期的服務集合中。

剔除服務:

剔除服務之前會先計算要是剔除的服務數量,然后遍歷過期服務,通過洗牌算法確保每次都公平的選擇出要剔除的服務,然后進行剔除。

執行剔除服務后:

  • 從register中刪除服務信息;
  • 更新隊列;
  • 清空二級緩存,保證數據的一致性;

服務獲取

Eureka Client服務的獲取都是從緩存中獲取,如果緩存中沒有,就加載數據到緩存中,然后在從緩存中取。服務的獲取方式分為全量同步和增量同步兩種。

registry中只保存數據結構,緩存中存ready的服務信息

  • 先讀取一級緩存
  • 先判斷是否開啟一級緩存
  • 如果開啟一級緩存,就從一級緩存中取,如果一級緩存中沒有,則從二級緩存中取;
  • 如果沒有開啟一級緩存,則直接從二級緩存中取;
  • 再讀取二級緩存
  • 如果二級緩存中存在,則直接返回;
  • 如果二級緩存中不存在,則先將數據加載到二級緩存中,然后再讀取二級緩存中的數據。

注意:加載二級緩存的時候需要判斷是全量還是增量,如果是增量的話,就從recentlyChangedQueue中加載,如果是全量的話就從registry中加載。

服務同步

服務同步是Server節點之間的數據同步。分為啟動時同步,運行時同步。

  • 啟動同步

啟動同步時,會先遍歷Applications中獲取的服務信息,並將服務信息注冊到registry中。可以參考PeerAwareInstanceRegistryImpl類中的syncUp方法:

public int syncUp() {
       // Copy entire entry from neighboring DS node
       int count = 0;

       for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
           if (i > 0) {
               try {
                   Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
               } catch (InterruptedException e) {
                   logger.warn("Interrupted during registry transfer..");
                   break;
               }
           }
           Applications apps = eurekaClient.getApplications();
           for (Application app : apps.getRegisteredApplications()) {
               for (InstanceInfo instance : app.getInstances()) {
                   try {
                       if (isRegisterable(instance)) {
                           register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                           count++;
                       }
                   } catch (Throwable t) {
                       logger.error("During DS init copy", t);
                   }
               }
           }
       }
       return count;
   }

注意這個方法使用類兩層for循環,第一次循環時保證自己已經拉取到服務信息,第二層循環是遍歷拉取到服務注冊信息。

  • 運行時同步

server端當有reigster、renew、cancel請求進來時,會將這些請求封裝到一個task中,然后放到一個隊列當中,然后經過一系列的處理后,在放到另一個隊列中。 可以查看PeerAwareInstanceRegistryImpl類中的BatchWorkerRunnable類,這里就不再貼源碼了。

總結

Eureka的原理接介紹到這里,從整體上看似簡單,但實現細節相關復雜。得多看幾遍源碼才能猜透他們的設計思路。

Eureka作為服務的注冊與發現,它實際的設計原則是遵循AP原則,也就是“數據的最終一致性”。現在還有好多公司使用zk、nacos來作為服務的注冊中心,后續會簡單更新一篇關於服務注冊中心的對比,這里就不過多闡述。


  • 寫作不易,轉載請注明出處,喜歡的小伙伴可以關注公眾號查看更多喜歡的文章。
  • 聯系方式:4272231@163.com QQ:95472323


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM