Eureka系列(三)獲取服務Client端具體實現


獲取服務Client 端流程

  我們先看下面這張圖片,這張圖片簡單描述了下我們Client是如何獲取到Server已續約實例信息的流程:Eureka獲取實例信息Client端實現.jpg  從圖片中我們可以知曉大致流程就是Client會自己開啟一個定時任務,然后根據不同的條件去調用Server端接口得到所有已續約服務的信息,然后合並到自己的緩存數據中。下面我們詳情了解下上述流程在源碼中的具體實現。


獲取服務Client端源碼分析

  我們先來看看服務獲取定時任務的初始化。那我們的服務獲取定時任務什么時候會被初始化呢,那肯定是我們啟用我們Eureka Client的時候唄,當我們啟動Client時,Eureka會先處理相關的配置,然后初始化我們Client的相關信息,我們的定時任務也就是此時進行的初始化,更具體地說我們的服務續約定時任務就是在DiscoveryClient這個類中initScheduledTasks方法中被初始化的。具體代碼如下:

private final ScheduledExecutorService scheduler;
private void initScheduledTasks() {
    …省略其他代碼
    // 初始化定時拉取服務注冊信息
    scheduler.schedule(
        new TimedSupervisorTask(
                "cacheRefresh",
                scheduler,
                cacheRefreshExecutor,
                registryFetchIntervalSeconds,
                TimeUnit.SECONDS,
                expBackOffBound,
                new CacheRefreshThread()
        ),
        registryFetchIntervalSeconds, TimeUnit.SECONDS);
     …省略其他代碼
}

  由此可見,我們的定時任務其實是Client進行初始化完成的,並且還是使用ScheduledExecutorService線程池來完成我們的定時任務。我們下面就看看CacheRefreshThread這個類是如何實現獲取服務的流程:

class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

  不多說,我們接着看refreshRegistry()方法:

 @VisibleForTesting
    void refreshRegistry() {
            …省略部分代碼
            boolean success = fetchRegistry(remoteRegionsModified); //  獲取實例信息
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
            …省略部分代碼
    }

  這里不做太多解釋,我們接着看fetchRegistry()方法:

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    try {
        // If the delta is disabled or if it is the first time, get all
        // applications
        Applications applications = getApplications();

        //1. 是否禁用增量更新;
        //2. 是否對某個region特別關注;
        //3. 外部調用時是否通過入參指定全量更新;
        //4. 本地還未緩存有效的服務列表信息;
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
            logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
            logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
            logger.info("Application is null : {}", (applications == null));
            logger.info("Registered Applications size is zero : {}",
                    (applications.getRegisteredApplications().size() == 0));
            logger.info("Application version is -1: {}", (applications.getVersion() == -1));
            getAndStoreFullRegistry(); // 全量更新
        } else {
            getAndUpdateDelta(applications); // 增量更新
        }
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to refresh its cache! status = {}", appPathIdentifier, e.getMessage(), e);
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }
    // Notify about cache refresh before updating the instance remote status
    onCacheRefreshed();
    // Update remote status based on refreshed data held in the cache
    updateInstanceRemoteStatus();
    // registry was fetched successfully, so return true
    return true;
}

  由此可見,fetchRegistry 方法主要是判斷我們獲取實例信息是進行全量更新還是增量更新。如果條件成立,則我們會進行全量更新,否則則進行批量更新。下面我們簡單介紹下getAndStoreFullRegistry() 全量更新、getAndUpdateDelta(applications)批量更新方法:

// 全量更新
private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    logger.info("Getting all instance registry info from the eureka server");

    Applications apps = null;
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            // 調用服務端接口得到全量的實例信息
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) 
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        // 將實例信息存進Applications
        apps = httpResponse.getEntity(); 
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {
        logger.error("The application is null for some reason. Not storing this information");
    } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        localRegionApps.set(this.filterAndShuffle(apps));
        logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
    } else {
        logger.warn("Not updating applications as another thread is updating it already");
    }
}

   getAndStoreFullRegistry() 簡單來說就是通過調用Eureka服務端提供的http接口獲取到全部實例信息,然后將實例信息存進我們的Applications中。

// 批量更新
private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
     // 得到增量的實例信息
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        delta = httpResponse.getEntity();
    }

    if (delta == null) { // 如果增量信息為空,則進行一次全量更新
        logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                + "Hence got the full registry.");
        getAndStoreFullRegistry();
    } 
        //考慮到多線程同步問題,這里通過CAS來確保請求發起到現在是線程安全的,
        //如果這期間fetchRegistryGeneration變了,就表示其他線程也做了類似操作,因此放棄本次響應的數據
    else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                // 合並增量實例信息
                updateDelta(delta); 
                // 用合並了增量數據之后的本地數據來生成一致性哈希碼
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {
            logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
        }
        // There is a diff in number of instances for some reason
         //Eureka server在返回增量更新數據時,也會返回服務端的一致性哈希碼,
         //理論上每次本地緩存數據經歷了多次增量更新后,計算出的一致性哈希碼應該是和服務端一致的,
         //如果發現不一致,就證明本地緩存的服務列表信息和Eureka server不一致了,需要做一次全量更新
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { 
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {
        logger.warn("Not updating application delta as another thread is updating it already");
        logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
    }
}

  getAndUpdateDelta 方法簡單來說,就是首先調用Eureka服務端提供的增量信息接口得到增量實例信息,然后進行判斷,如果增量為null,為了數據准確性,調用一下全量更新實例接口更新實例信息。如果增量信息不為空,則進行一個cas處理,如果成功,則進行增量信息的合並。最后會再進行一次判斷,判斷從服務端得到的批量實例信息計算得到的HashCode是否和從服務端得到的實例信息HashCode值是否相等,如果不相等則會調用reconcileAndLogDifference 方法,再次進行全量更新實例信息。下面我們就簡單看下reconcileAndLogDifference這個方法:

private void reconcileAndLogDifference(Applications delta, String reconcileHashCode) throws Throwable {
    logger.debug("The Reconcile hashcodes do not match, client : {}, server : {}. Getting the full registry",
            reconcileHashCode, delta.getAppsHashCode());
    RECONCILE_HASH_CODES_MISMATCH.increment();
    long currentUpdateGeneration = fetchRegistryGeneration.get();
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            // 調用服務端接口得到全量的實例信息
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) 
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    Applications serverApps = httpResponse.getEntity();
    if (serverApps == null) {
        logger.warn("Cannot fetch full registry from the server; reconciliation failure");
        return;
    }
    if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        localRegionApps.set(this.filterAndShuffle(serverApps));
        getApplications().setVersion(delta.getVersion());
        logger.debug(
                "The Reconcile hashcodes after complete sync up, client : {}, server : {}.",
                getApplications().getReconcileHashCode(),
                delta.getAppsHashCode());
    } else {
        logger.warn("Not setting the applications map as another thread has advanced the update generation");
    }
}

   由此可見,reconcileAndLogDifference方法和我們getAndStoreFullRegistry方法調用的接口一樣,都是調用Eureka服務端提供的全量實例信息接口,然后更新我們自己的實例信息。

總的來說,獲取服務Client端的執行流程就可以分為以下兩步:
   1.初始化一個定時任務,默認30s
   2.定時任務中根據不同的情況調用不同的方法來更新本地實例緩存信息

在定時任務中獲取實例信息我們也可以分為以下幾步:
   1.判斷是否需要全量更新
   2.條件成立則進行全量更新
     a.將全量更新數據更新到本地緩存中
  3.條件不成立則進行批量更新
     a.判斷批量更新數據是否為空,是則進行一次全量更新
     b.將批量更新數據合並到本地緩存中
     c.判斷批量更新數據計算得到的HashCode是否和服務端傳過來的HashCode相等,如果不相等,說明數據有問題,需要進行一次全量更新


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM