spring cloud eureka注冊原理-注冊失敗填坑


寫在前面

  我們知道Eureka分為兩部分,Eureka Server和Eureka Client。Eureka Server充當注冊中心的角色,Eureka Client相對於Eureka Server來說是客戶端,需要將自身信息注冊到注冊中心。本文主要介紹的就是在Eureka Client注冊到Eureka Server時RetryableClientQuarantineRefreshPercentage參數的使用技巧。

Eureka Client注冊過程分析

  Eureka Client注冊到Eureka Server時,首先遇到第一個問題就是Eureka Client端要知道Server的地址,這個參數對應的是eureka.client.service-url.defaultZone舉個例子,在Eureka Client的properties文件中配置如下:

eureka.client.service-url.defaultZone=
http://localhost:8761/eureka,http://localhost:8762/eureka,http://localhost:8763/eureka,http://localhost:8764/eureka

  如上所示,Eureka Client配置對應的Eureka Server地址分別是8761、8762、8763、8764。這里存在兩個問題:

  • Eureka Client會將自身信息分別注冊到這四個地址嗎?
  • Eureka Clinent注冊機制是怎樣的?

  源碼面前一目了然,帶着這兩個問題我們通過源碼來解答這兩個問題。Eureka Client在啟動的時候注冊源碼如下:
RetryableEurekaHttpClient中的execut方法

 @Override
  protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
      List<EurekaEndpoint> candidateHosts = null;
      int endpointIdx = 0;
      for (int retry = 0; retry < numberOfRetries; retry++) {
          EurekaHttpClient currentHttpClient = delegate.get();
          EurekaEndpoint currentEndpoint = null;
          if (currentHttpClient == null) {
              if (candidateHosts == null) {
                  candidateHosts = getHostCandidates();
                  if (candidateHosts.isEmpty()) {
                      throw new TransportException("There is no known eureka server; cluster server list is empty");
                  }
              }
              if (endpointIdx >= candidateHosts.size()) {
                  throw new TransportException("Cannot execute request on any known server");
              }

              currentEndpoint = candidateHosts.get(endpointIdx++);
              currentHttpClient = clientFactory.newClient(currentEndpoint);
          }

          try {
              EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
              if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                  delegate.set(currentHttpClient);
                  if (retry > 0) {
                      logger.info("Request execution succeeded on retry #{}", retry);
                  }
                  return response;
              }
              logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
          } catch (Exception e) {
              logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
          }

          // Connection error or 5xx from the server that must be retried on another server
          delegate.compareAndSet(currentHttpClient, null);
          if (currentEndpoint != null) {
              quarantineSet.add(currentEndpoint);
          }
      }
      throw new TransportException("Retry limit reached; giving up on completing the request");
  }

  按照我的理解,代碼精簡后內容如下:

int endpointIdx = 0;
//用來保存所有Eureka Server信息(8761、8762、8763、8764)
List<EurekaEndpoint> candidateHosts = null;
//numberOfRetries的值代碼寫死默認為3次
for (int retry = 0; retry < numberOfRetries; retry++) {
    /**
     *首次進入循環時,獲取全量的Eureka Server信息(8761、8762、8763、8764)
     */
    if (candidateHosts == null) {
        candidateHosts = getHostCandidates();
    }
    /**
     *通過endpointIdx自增,依次獲取Eureka Server信息,然后發送
     *注冊的Post請求.
     */
    currentEndpoint = candidateHosts.get(endpointIdx++);
    currentHttpClient = clientFactory.newClient(currentEndpoint);
    try {
       /**
         *發送注冊的Post請求動作,注意如果成功,則跳出循環,如果失敗則
         *根據endpointIdx依次獲取下一個Eureka Server.
         */
        response = requestExecutor.execute(currentHttpClient);
        return respones;
    } catch (Exception e) {
        //向注冊中心(Eureka Server)發起注冊的post出現異常時,打印日志...
    }
    //如果此次注冊動作失敗,將當前的信息保存到quarantineSet中(一個Set集合)
    if (currentEndpoint != null) {
        quarantineSet.add(currentEndpoint);
    }
}
//如果都失敗,則以異常形式拋出...
throw new TransportException("Retry limit reached; giving up on completing the request");

  上面代碼中還有一個方法很重要就是List<EurekaEndpoint> candidateHosts = getHostCandidates();接下來看下getHostCandidates()方法源碼

    private List<EurekaEndpoint> getHostCandidates() {
        List<EurekaEndpoint> candidateHosts = clusterResolver.getClusterEndpoints();
        quarantineSet.retainAll(candidateHosts);

        // If enough hosts are bad, we have no choice but start over again
        int threshold = (int) (candidateHosts.size() * transportConfig.getRetryableClientQuarantineRefreshPercentage());
        if (quarantineSet.isEmpty()) {
            // no-op
        } else if (quarantineSet.size() >= threshold) {
            logger.debug("Clearing quarantined list of size {}", quarantineSet.size());
            quarantineSet.clear();
        } else {
            List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
            for (EurekaEndpoint endpoint : candidateHosts) {
                if (!quarantineSet.contains(endpoint)) {
                    remainingHosts.add(endpoint);
                }
            }
            candidateHosts = remainingHosts;
        }

        return candidateHosts;
    }

  按照我的理解,將代碼精簡下,只包括關鍵邏輯,內容如下:

private List<EurekaEndpoint> getHostCandidates() {
    /**
     * 獲取所有defaultZone配置的注冊中心信息(Eureka Server),
     * 在本文例子中代表4個(8761、8762、8763、8764)Eureka Server
     */
    List candidateHosts = clusterResolver.getClusterEndpoints();
    /**
     * quarantineSet這個Set集合中保存的是不可用的Eureka Server
     * 此處是拿不可用的Eureka Server與全量的Eureka Server取交集
     */
    quarantineSet.retainAll(candidateHosts);
    /**
     * 根據RetryableClientQuarantineRefreshPercentage參數計算閾值
     * 該閾值后續會和quarantineSet中保存的不可用的Eureka Server個數
     * 作比較,從而判斷是否返回全量的Eureka Server還是過濾掉不可用的
     * Eureka Server。
     */
    int threshold = 
       (int) (
        candidateHosts.size()
              *
        transportConfig.getRetryableClientQuarantineRefreshPercentage()
        );
    if (quarantineSet.isEmpty()) {
        /**
         * 首次進入的時候,此時quarantineSet為空,直接返回全量的
         * Eureka Server列表
         */
    } else if (quarantineSet.size() >= threshold) {
        /**
         * 將不可用的Eureka Server與threshold值相比較,如果不可
         * 用的Eureka Server個數大於閾值,則將之前保存的Eureka
         * Server內容直接清空,並返回全量的Eureka Server列表。
         */
        quarantineSet.clear();
    } else {
        /**
         * 通過quarantineSet集合保存不可用的Eureka Server來過濾
         * 全量的EurekaServer,從而獲取此次Eureka Client要注冊要
         * 注冊的Eureka Server實例地址。
         */
        List<EurekaEndpoint> remainingHosts = new ArrayList<>(candidateHosts.size());
        for (EurekaEndpoint endpoint : candidateHosts) {
            if (!quarantineSet.contains(endpoint)) {
                remainingHosts.add(endpoint);
            }
        }
        candidateHosts = remainingHosts;
    }
    return candidateHosts;
}

  通過源碼分析,我們現在初步知道,當Eureka Client向Eureka Server發起注冊請求的時候(根據defaultZone尋找Eureka Server列表),如果有一次請求注冊成功,那么后續就不會在向其他Eureka Server發起注冊請求。以本文為例,注冊中心有四個(8761、8762、8763、8764)。如果8761對應的Eureka Server服務的狀態是UP,那么Eureka Client向該注冊中心注冊成功后,不會再向(8762、8763、8764)對應的Eureka Server發起注冊請求(對應程序是在for循環中直接return respones)。

  說到這里又引出來另外一個問題,如果8761這個Eureka Server是down掉的呢?
根據源碼我們可知Eureka Client首次會向8761這個Server發起注冊請求,如果該Server的狀態是down,那么它會將該Server保存到quarantineSet這個Set集合中,然后再次訪問8762這個Eureka Server,如果8762這個Server的狀態依舊是down,它也會把這個Server保存到quarantineSet這個Set集合中,然后繼續訪問8763這個Server,如果8763這個Server的狀態依舊是down,此時除了會將其保存到quarantineSet這個Set集合中之外,還會跳出本次循環。從而結束此次注冊過程。

  道這里有人要問接下來會不會向8764這個Server發起注冊,答案是否定的,因為循環的次數默認是3次。所以即使8764這個Server的狀態是UP,它也不會接收到來自Eureka Client發起的注冊信息。

  Eureka Client向Eureka Server發起注冊信息的過程除了在Eureka Client啟動的時候觸發,還有另外一種方式,就是后台定時任務。

  假設我們上面描述的場景是在Eureka Client啟動的時候,因為在啟動的時候注冊這個過程全部失敗了,當后台定時任務執行時,還會進入該注冊流程。注意此時quarantineSet的值為3(8761、8762、8763之前注冊失敗的Eureka Server)。
所以當程序再次進入getHostCandidates()方法時,if (quarantineSet.isEmpty())這個方法是不滿足的,接下來會走else if (quarantineSet.size() >= threshold)這個判斷,如果這個判斷成立,那么會將quarantineSet集合清空,同時返回全量的Eureka Server列表,如果這個判斷不成立,會拿quarantineSet集合中保存的內容去過濾Eureka Server的全量列表。以本文為例:

  • quarantineSet中保存的是(8761、8762、8763)三個Eureka Server
  • Eureka Server全量列表的內容是(8761、8762、8763、8764)四個Eureka Server過濾后返回的結果為8764這個Eureka Server。

  在本文的例子中8761、8762、8763這三個Eureka Server的狀態是down而8764這個Eureka Server的狀態是UP,我們其實是想走到最后的else分支,從而完成過濾操作,並最終得到8764這個Server,遺憾的是它並不會走到這個分支,而是被上面的else if (quarantineSet.size() >= threshold)這個分支所攔截,返回的依舊是全量的Eureka Server列表。這樣造成的后果就是Eureka Client依舊會依次向(8761、8762、8763)這三個down的Eureka Server發起注冊請求。
那么問題的關鍵在哪里呢?問題的關鍵就是threshold這個值的由來,因為此時quarantineSet.size()的值為3,而3這個值大於threshold,從而導致,會將quarantineSet集合清空,返回全量的Server列表。
  我們知道threshold這個值是根據全量的Eureka Server列表乘以一個可配置的參數計算出來的,在本文的例子當中,我的properties文件中除了defaultZone之外並沒有配置這個參數,那么也就是說這個參數是有默認值的,通過源碼我們了解到,這個默認值是0.66。具體源碼如下:

final class PropertyBasedTransportConfigConstants {
    /**
     *省略部分源碼
     */
    static class Values {
        static final int SESSION_RECONNECT_INTERVAL = 20*60;
        //默認值為0.66
        static final double QUARANTINE_REFRESH_PERCENTAGE = 0.66;
        static final int DATA_STALENESS_TRHESHOLD = 5*60;
        static final int ASYNC_RESOLVER_REFRESH_INTERVAL = 5*60*1000;
        static final int ASYNC_RESOLVER_WARMUP_TIMEOUT = 5000;
        static final int ASYNC_EXECUTOR_THREADPOOL_SIZE = 5;
    }
}
/**
 *@return the percentage of the full endpoints set above which the   
 *quarantine set is cleared in the range [0, 1.0]
 */
double getRetryableClientQuarantineRefreshPercentage();

  看到這里就不難理解了,因為這個值是0.66而此時全量的Eureka Server值為4。計算之后的值為2,而由於注冊的for循環為3次,所以當第二次發起注冊流程的時候quarantineSet的值始終大於threshold。這樣就會導致一個問題,就是如果8761、8762、8763一直是down即使8764一直是好的,那么Eureka Client也不會注冊成功。而且這個參數值的區間為0到1.

  既然通過源碼分析我們找到了問題根源,其實對應的我們也找到了解決這個問題的辦法,就是對應把這個參數值調大些。
這個值在properties中對應的寫法如下:

eureka.client.transport.retryableClientQuarantineRefreshPercentage = xxx

  接下來我們修改下properties文件,修改后的內容如下:

 

eureka.client.service-url.defaultZone=
http://localhost:8761/eureka,http://localhost:8762/eureka,http://localhost:8763/eureka,http://localhost:8764/eureka
eureka.client.transport.retryableClientQuarantineRefreshPercentage=1
eureka.client.service-url.defaultZone=
http://localhost:8761/eureka,http://localhost:8762/eureka,http://localhost:8763/eureka,http://localhost:8764/eureka
eureka.client.transport.retryableClientQuarantineRefreshPercentage=1

接下來按照這個配置再次回顧下上面的流程:

  • Eureka Client啟動時進行注冊(8761、8762、8763的狀態是down),所以此時quarantineSet的值為3.
  • 接下來在定時任務中又觸發注冊事件,此時因為參數的值從0.66調整為1。所以計算出的threshold的值為4。而此時quarantineSet的值為3。所以不會進入到else if (quarantineSet.size() >= threshold)分支,而是會進入最后的esle分支。
  • 在else分支中會完成過濾功能,最終返回的list中的結果只有一個就是8764這個Eureka Server。
  • Eureka Client向8764這個Eureka Server發起注冊請求,得到成功相應,並返回。

遺留問題

  說道這里我們感覺好像是解決了這個問題,那么問一個問題,這個參數值可以設置的無限大嗎?
  比如我將這個參數值設置為10,雖然javaDoc中說明這個參數值的范圍在0-1之間,但是並沒有說明如果將這個參數調整大於1會出現什么情況。接下來按照上面的流程我們分析下:
  之前我們分析的流程中的前提是8761、8762、8763這三台Server的狀態是down而8764這個server的狀態是up,現在我們修改下這個前提。
  假設一開始8761、8762、8763、8764這四台Eureka Server的狀態都是down。
  Eureka Client啟動時進行注冊(8761、8762、8763的狀態是down),所以此時quarantineSet的值為3.

  • 接下來在定時任務中又觸發注冊事件,此時因為參數的值從0.66調整為10。所以計算出的threshold的值為40。而此時quarantineSet的值為3。所以不會進入到else if (quarantineSet.size() >= threshold)分支,而是會進入最后的esle分支。
  • 在else分支中會完成過濾功能,最終返回的list中的結果只有一個就是8764這個Eureka Server。
  • Eureka Client向8764這個Eureka Server發起注冊請求,因為此時8764的狀態也是down導致注冊失敗,此時quarantineSet中的內容是(8761、8762、8763、8764)
  • 當定時任務再次觸發時if (quarantineSet.isEmpty())這個分支不會進入,因為此時quarantineSet的值為4else if (quarantineSet.size() >= threshold)這分支也不會進入因為threshold的值為40
  • 最終會進入else分支,這個分支原本的含義是想通過quarantineSet來充當過濾器,從全量的Eureka Server中過濾掉之前狀態為down的Eureka Server,但是由於quarantineSet的值現在已經是全量,導致過濾后的結果返回的是一個空的list。即使此時Eureka Server列表(8761、8762、8763、8764)任何一個Server的狀態變為UP,該Eureka Client也不可能完成注冊事件。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM