SpringCloud學習筆記(三、SpringCloud Netflix Eureka)


目錄:

  • 服務發現簡介
  • SpringCloud Netflix Eureka應用
  • Eureka高可用
  • Eureka源碼分析 >>> Eureka Client初始化(客戶端定時獲取服務列表、客戶端定時發送心跳續約、客戶端定時注冊)源碼分析、服務下線源碼分析

服務發現簡介:

1、什么是服務發現

程序通過一個標識獲取服務列表,且這個服務列表能夠跟隨服務的狀態而動態變更

2、服務發現的兩種模式

)客戶端模式:調用微服務時,首先到注冊中心獲取服務列表,然后再根據調用本地的負載均衡策略進行服務調用,並且本地會緩存一份服務列表

)服務端模式:調用端直接向注冊中心發起請求,注冊中心再通過自身的負載均衡策略進行服務調用調用端自身是不需要維護服務發現邏輯

3、客戶端、服務端兩種模式的比較

客戶端

a、獲取列表為周期性,在調用上減少了一次鏈路,但每個客戶端都需要維護獲取服務列表的邏輯

b、可用性高,因為本地緩存了一份服務列表的原因,所以即使注冊中心出現故障了也不會影響客戶端的正常使用

c、服務端上下線先會對客戶端有一定的影響,會出現短暫的調用失敗

服務端

a、簡單,客戶端不需要維護獲取服務列表的邏輯

b、可用性由服務管理者覺定,若服務管理者發送故障則所有的客戶端將不可用;同時,所有的調用及存儲都有服務管理者來完成,這樣服務管理者可能會負載過高

c、服務端上下線客戶端無感知

SpringCloud Netflix Eureka應用:

1、Eureka服務端

)添加Eureka Server依賴

1 <dependency>
2     <groupId>org.springframework.cloud</groupId>
3     <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
4 </dependency>

)啟動類加上@EnableEurekaServer注解

)配置properties

 1 ## Eureka注冊中心實例名
 2 spring.application.name=eureka-server
 3 ## Eureka注冊中心端口
 4 server.port=9090
 5 
 6 ## 關閉Actuator驗證開關
 7 management.security.enabled=false
 8 
 9 ## 不向注冊中心獲取服務列表
10 ## eureka服務端的eureka.client.fetch-registry配置必須為false讓他不要去找server節點,因為它本身就是server節點
11 eureka.client.fetch-registry=false
12 ## 不注冊到注冊中心上
13 eureka.client.register-with-eureka=false
14 
15 ## 配置 注冊中心的 地址
16 eureka.client.service-url.defaultZone=http://localhost:9090/eureka

啟動后可通過http://localhost:9090/eureka查看服務端情況

2、Eureka客戶端

)添加Eureka Client依賴

1 <dependency>
2     <groupId>org.springframework.cloud</groupId>
3     <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
4 </dependency>

)啟動類加上@EnableEurekaClient或@EnableDiscoveryClient

二者的共同點是:都是能夠讓注冊中心能夠發現,掃描到該服務。

不同點:@EnableEurekaClient只適用於Eureka作為注冊中心,而@EnableDiscoveryClient可以是其他的注冊中心。

)配置properties

 1 ## Eureka服務提供者實例名
 2 spring.application.name=eureka-provider
 3 ## Eureka服務提供者端口
 4 server.port=8070
 5 
 6 ## 關閉Actuator驗證開關
 7 management.security.enabled=false
 8 
 9 ## 配置 注冊中心的 地址
10 eureka.client.service-url.defaultZone=http://localhost:9090/eureka/

3、一些常用的配置

)客戶端配置

)服務實例配置

Eureka高可用:

我們都知道Eureka分為服務端和客戶端,所以搭建高可用的Eureka時二者都需要高可用。

1、服務端

服務端的高可用其實就是讓客戶端獲取服務列表時盡可能的少失敗,所以我們只需要啟動兩個Eureka Server,讓他們相互復制服務列表即可

1 server.port=9091
2 eureka.client.service-url.defaultZone=http://localhost:9092/eureka
1 server.port=9092
2 eureka.client.service-url.defaultZone=http://localhost:9091/eureka

2、客戶端

客戶端的高可用就是在獲取服務列表時盡可能的少失敗,所以我們只需要配置多個注冊中心即可

1 eureka.client.service-url.defaultZone=http://localhost:9091/eureka,http://localhost:9092/eureka

Eureka Client初始化: 

首先我們知道若想將一個應用程序注冊為Eureka的客戶端那最主要的便是在啟動類上加上@EnableDiscoveryClient這個注解,這便是Eureka Client的初始化。

閱讀目的:為什么加上@EnableDiscoveryClient注解后且配置eureka.client.service-url.defaultZone=http后,就能將此服務注冊到eureka,且能夠讓其它注冊的服務發現此服務。

1、首先以@EnableDiscoveryClient注解來分析

從注解的字面意思來看就是啟動DiscoveryClient,我們來大膽的猜測下 φ(>ω<*) 

emmmmm,程序中應該有DiscoveryClient這個類吧,Ctrl + n,(⊙o⊙)…果真有這個類!

根據IDEA的檢索結果,發現滿足條件的有兩個,一個是com.netflix.discovery.DiscoveryClient,一個是org.springframework.cloud.client.discovery.DiscoveryClient

看源碼嘛,肯定先看結構咯~~~

com.netflix.discovery.DiscoveryClient

org.springframework.cloud.client.discovery.DiscoveryClient是一個接口,其實現有很多,但Eureka的是org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient

所以兩個DiscoveryClient的邏輯結構便如下圖:

2、了解結構后,我們再略讀下這兩個DiscoveryClient的實現

)首先從SpringCloud的看起,實現類org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient

a、通過略讀可以看出比較重要的public List<ServiceInstance> getInstances(String serviceId)、public List<String> getServices()兩個方法都有使用一個eurekaClient的屬性

b、而eurekaClient正是,Netflix的EurekaClient接口,所以我們可以得知SpringCloud應該僅是對Netflix的一個包裝

c、所以我們直接看Netflix的EurekaClient接口(com.netflix.discovery.EurekaClient)的實現 >>> com.netflix.discovery.DiscoveryClient

com.netflix.discovery.DiscoveryClient的實現

a、一般看類的實現是從構造函數入手,所以我們先找到最全的構造函數:DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider);

b、此構造一開始是大量的判斷,這塊就不看了,我們僅看最重要那部分(正常邏輯下,大量判斷中的代碼邏輯不是對bug的處理就是特殊情況的處理,所以我們先看那些不在if中的代碼或者是少量if的代碼) >>> initScheduledTasks()

c、initScheduledTasks():初始化定時器,其主要分為三大塊 >>> 客戶端定時獲取服務列表、客戶端定時發送心跳續約、客戶端定時注冊

客戶端定時獲取服務列表源碼分析:

源碼示例(客戶端):

 1 if (clientConfig.shouldFetchRegistry()) {
 2     // registry cache refresh timer
 3     int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
 4     int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
 5     scheduler.schedule(
 6         new TimedSupervisorTask(
 7             "cacheRefresh",
 8             scheduler,
 9             cacheRefreshExecutor,
10             registryFetchIntervalSeconds,
11             TimeUnit.SECONDS,
12             expBackOffBound,
13             new CacheRefreshThread() // 定時獲取服務列表thread
14         ),
15         registryFetchIntervalSeconds, TimeUnit.SECONDS);
16 }

1、首先我們看第1行,clientConfig.shouldFetchRegistry() == true才會執行獲取服務列表的job,我們點進去看,發現其實就是我們properties配置的eureka.client.fetch-registry,而默認值為true。

2、然后執行job的周期單位為秒(11行),執行周期為registryFetchIntervalSeconds,也就是第3行;第3行和第1行同理,為properties配置的eureka.client.registry-fetchInterval-seconds,而默認值為30

3、最后我們看看其核心線程(13行),可以看到其調用的函數其實是void refreshRegistry();函數最開始一大堆判斷,最后一堆debug,這些都不用細究,我們直接看最核心的哪行代碼boolean success = fetchRegistry(remoteRegionsModified)

 1 private boolean fetchRegistry(boolean forceFullRegistryFetch) {
 2     Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
 3 
 4     try {
 5         // If the delta is disabled or if it is the first time, get all
 6         // applications
 7         Applications applications = getApplications();
 8 
 9         if (clientConfig.shouldDisableDelta()
10                 || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
11                 || forceFullRegistryFetch
12                 || (applications == null)
13                 || (applications.getRegisteredApplications().size() == 0)
14                 || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
15         {
16             logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
17             logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
18             logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
19             logger.info("Application is null : {}", (applications == null));
20             logger.info("Registered Applications size is zero : {}",
21                     (applications.getRegisteredApplications().size() == 0));
22             logger.info("Application version is -1: {}", (applications.getVersion() == -1));
23             getAndStoreFullRegistry();
24         } else {
25             getAndUpdateDelta(applications);
26         }
27         applications.setAppsHashCode(applications.getReconcileHashCode());
28         logTotalInstances();
29     } catch (Throwable e) {
30         logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
31         return false;
32     } finally {
33         if (tracer != null) {
34             tracer.stop();
35         }
36     }
37 
38     // Notify about cache refresh before updating the instance remote status
39     onCacheRefreshed();
40 
41     // Update remote status based on refreshed data held in the cache
42     updateInstanceRemoteStatus();
43 
44     // registry was fetched successfully, so return true
45     return true;
46 }

從代碼中我們可以看出要么是全量注冊(23行)要么是增量注冊(25行):

)全量注冊

 1 private void getAndStoreFullRegistry() throws Throwable {
 2     long currentUpdateGeneration = fetchRegistryGeneration.get();
 3 
 4     logger.info("Getting all instance registry info from the eureka server");
 5 
 6     Applications apps = null;
 7     EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
 8             ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
 9             : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
10     if (httpResponse.getStatusCode() == Response.Status.OK.getStatusCode()) {
11         apps = httpResponse.getEntity();
12     }
13     logger.info("The response status is {}", httpResponse.getStatusCode());
14 
15     if (apps == null) {
16         logger.error("The application is null for some reason. Not storing this information");
17     } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
18         localRegionApps.set(this.filterAndShuffle(apps));
19         logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
20     } else {
21         logger.warn("Not updating applications as another thread is updating it already");
22     }
23 }

)從中可以看出18行將apps從httpResponse獲取,所以服務列表應該是從服務端獲取的;故看下http調用,第8行調到其實現類com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient,看到其調用的http url是apps/

1 @Override
2 public EurekaHttpResponse<Applications> getApplications(String... regions) {
3     return getApplicationsInternal("apps/", regions);
4 }

再點進去看,發現其實就是調用了http://服務端ip:服務端port/eureka/apps(GET請求),並且將結果放入Applications

)增量注冊,增量注冊與全量同理,但調用的是http://服務端ip:服務端port/eureka/apps/delta接口

全量增量注冊講完后我們來看看服務端的代碼(着重看全量,增量與全量原理差不多)

源碼示例(服務端):

我們知道全量注冊調用的是http://服務端ip:服務端port/eureka/apps接口,我們找下這個接口的實現,com.netflix.eureka.resources.ApplicationsResource#getContainers,並找到最重要的代碼塊

 1 Response response;
 2 if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
 3     response = Response.ok(responseCache.getGZIP(cacheKey))
 4             .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
 5             .header(HEADER_CONTENT_TYPE, returnMediaType)
 6             .build();
 7 } else {
 8     response = Response.ok(responseCache.get(cacheKey))
 9             .build();
10 }
11 return response;

進入responseCache.getZIP(cacheKey)后,我們可以知道代碼很簡單,就是從緩存中獲取數據給客戶端。

參數useReadOnlyCache也是客戶端配置的,默認為true;6-12行很簡單,緩存有取緩存,沒有則從readWriteCacheMap拿到后再放入緩存。

 1 @VisibleForTesting
 2 ResponseCacheImpl.Value getValue(final Key key, boolean useReadOnlyCache) {
 3     ResponseCacheImpl.Value payload = null;
 4     try {
 5         if (useReadOnlyCache) {
 6             final ResponseCacheImpl.Value currentPayload = readOnlyCacheMap.get(key);
 7             if (currentPayload != null) {
 8                 payload = currentPayload;
 9             } else {
10                 payload = readWriteCacheMap.get(key);
11                 readOnlyCacheMap.put(key, payload);
12             }
13         } else {
14             payload = readWriteCacheMap.get(key);
15         }
16     } catch (Throwable t) {
17         logger.error("Cannot get value for key :" + key, t);
18     }
19     return payload;
20 }

客戶端定時發送心跳續約:

 1 scheduler.schedule(
 2         new TimedSupervisorTask(
 3                 "heartbeat",
 4                 scheduler,
 5                 heartbeatExecutor,
 6                 renewalIntervalInSecs,
 7                 TimeUnit.SECONDS,
 8                 expBackOffBound,
 9                 new HeartbeatThread()
10         ),
11         renewalIntervalInSecs, TimeUnit.SECONDS);

簡單的地方我們就不一一看了,直接進入重點,發現第9行就是不斷刷新lastSuccessfulHeartbeatTimestamp使之為當前時間戳(永不過期)

1 if (renew()) {
2     lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
3 }

那我們再看看更新lastSuccessfulHeartbeatTimestamp的條件renew唄!

查看renew后可以得知,心跳續約調用了客戶端的/apps/appName/id接口(PUT請求);然后我們卡卡客戶端實現

1、首先接口在com.netflix.eureka.resources.InstanceResource#renewLease

2、我們想其核心代碼boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode)里跟進

 1 public boolean renew(String appName, String id, boolean isReplication) {
 2     RENEW.increment(isReplication);
 3     // 還是一樣的從registry中拿InstanceInfo
 4     Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
 5     Lease<InstanceInfo> leaseToRenew = null;
 6     if (gMap != null) {
 7         leaseToRenew = gMap.get(id);
 8     }
 9     // 如果拿不到InstanceInfo就表示服務掛了,心跳續約失敗
10     if (leaseToRenew == null) {
11         RENEW_NOT_FOUND.increment(isReplication);
12         logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id);
13         return false;
14     } else {
15         InstanceInfo instanceInfo = leaseToRenew.getHolder();
16         if (instanceInfo != null) {
17             // touchASGCache(instanceInfo.getASGName());
18             InstanceInfo.InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(
19                     instanceInfo, leaseToRenew, isReplication);
20             if (overriddenInstanceStatus == InstanceInfo.InstanceStatus.UNKNOWN) {
21                 logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}"
22                         + "; re-register required", instanceInfo.getId());
23                 RENEW_NOT_FOUND.increment(isReplication);
24                 return false;
25             }
26             if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) {
27                 Object[] args = {
28                         instanceInfo.getStatus().name(),
29                         instanceInfo.getOverriddenStatus().name(),
30                         instanceInfo.getId()
31                 };
32                 logger.info(
33                         "The instance status {} is different from overridden instance status {} for instance {}. "
34                                 + "Hence setting the status to overridden status", args);
35                 instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus);
36             }
37         }
38         renewsLastMin.increment();
39         // 如果能拿到InstanceInfo就做一個續約
40         leaseToRenew.renew();
41         return true;
42     }
43 }

3、進一步看下續約leaseToRenew.renew()方法

1 public void renew() {
2     lastUpdateTimestamp = System.currentTimeMillis() + duration;
3 }

代碼很簡單,就是延長lastUpdateTimestamp的時間,duration則是通過構造傳入的;若duration有執行則用構造指定的,若沒有默認90秒

客戶端定時注冊:

1 instanceInfoReplicator = new InstanceInfoReplicator(
2         this,
3         instanceInfo,
4         clientConfig.getInstanceInfoReplicationIntervalSeconds(),
5         2); // burstSize

進入InstanceInfoReplicator后會發現這個類實現Runnable接口,那既然是線程就去看run方法咯

 1 public void run() {
 2     try {
 3         discoveryClient.refreshInstanceInfo();
 4 
 5         Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
 6         if (dirtyTimestamp != null) {
 7             discoveryClient.register();
 8             instanceInfo.unsetIsDirty(dirtyTimestamp);
 9         }
10     } catch (Throwable t) {
11         logger.warn("There was a problem with the instance info replicator", t);
12     } finally {
13         Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
14         scheduledPeriodicRef.set(next);
15     }
16 }

然后我們看第7行,一步步根進去發現其實調用了"apps/" + info.getAppName()接口

接下來我們來看看"apps/" + info.getAppName()接口的實現

1、找到com.netflix.eureka.resources.ApplicationResourceaddInstance方法,找到其中的registry.register(info, "true".equals(isReplication));根進去

2、找到com.netflix.eureka.registry.AbstractInstanceRegistry的register方法

 1 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
 2     try {
 3         read.lock();
 4         // 根據實例名registrant.getAppName()獲取InstanceInfo
 5         Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
 6         REGISTER.increment(isReplication);
 7         if (gMap == null) {
 8             final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
 9             // 若registry沒有此實例時,注冊一個空的示例
10             gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
11             if (gMap == null) {
12                 gMap = gNewMap;
13             }
14         }
15         Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
16         // Retain the last dirty timestamp without overwriting it, if there is already a lease
17         if (existingLease != null && (existingLease.getHolder() != null)) {
18             Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
19             Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
20             logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
21 
22             // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted
23             // InstanceInfo instead of the server local copy.
24             // 若eureka服務器存在的示例的時間戳大於傳入新實例的時間戳,則用已存在的(說明eureka server使用的實例都是最新的)
25             if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
26                 logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
27                         " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
28                 logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
29                 registrant = existingLease.getHolder();
30             }
31         } else {
32             // The lease does not exist and hence it is a new registration
33             synchronized (lock) {
34                 if (this.expectedNumberOfRenewsPerMin > 0) {
35                     // Since the client wants to cancel it, reduce the threshold
36                     // (1
37                     // for 30 seconds, 2 for a minute)
38                     this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
39                     this.numberOfRenewsPerMinThreshold =
40                             (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
41                 }
42             }
43             logger.debug("No previous lease information found; it is new registration");
44         }
45         // 根據新的示例創建新的租約
46         Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
47         if (existingLease != null) {
48             lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
49         }
50         gMap.put(registrant.getId(), lease);
51         synchronized (recentRegisteredQueue) {
52             recentRegisteredQueue.add(new Pair<Long, String>(
53                     System.currentTimeMillis(),
54                     registrant.getAppName() + "(" + registrant.getId() + ")"));
55         }
56         // This is where the initial state transfer of overridden status happens
57         if (!InstanceInfo.InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
58             logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
59                     + "overrides", registrant.getOverriddenStatus(), registrant.getId());
60             if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
61                 logger.info("Not found overridden id {} and hence adding it", registrant.getId());
62                 overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
63             }
64         }
65         InstanceInfo.InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
66         if (overriddenStatusFromMap != null) {
67             logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
68             registrant.setOverriddenStatus(overriddenStatusFromMap);
69         }
70 
71         // Set the status based on the overridden status rules
72         InstanceInfo.InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
73         registrant.setStatusWithoutDirty(overriddenInstanceStatus);
74 
75         // If the lease is registered with UP status, set lease service up timestamp
76         if (InstanceInfo.InstanceStatus.UP.equals(registrant.getStatus())) {
77             lease.serviceUp();
78         }
79         registrant.setActionType(InstanceInfo.ActionType.ADDED);
80         recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease));
81         registrant.setLastUpdatedTimestamp();
82         invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
83         logger.info("Registered instance {}/{} with status {} (replication={})",
84                 registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
85     } finally {
86         read.unlock();
87     }
88 }

服務下線: 

若要將此客戶端下線的話,要分兩步走

)啟動下線接口(配置properties)

1 # 啟用下線功能
2 endpoints.shutdown.enabled=true
3 # 關閉下線功能的安全校驗
4 endpoints.shutdown.sensitive=false

)調用下線接口(http://當前客戶端ip/當前客戶端port/shutdown)

1、客戶端

客戶端代碼很簡單,主要分為兩步

)cancelScheduledTasks() >>> 停止客戶端初始化的三個job(客戶端定時獲取服務列表、客戶端定時發送心跳續約、客戶端定時注冊)

)unregister() >>> 注銷(調用apps/appName/id接口,delete方式)

2、服務端

首先我們來找到客戶端調的那個接口:com.netflix.eureka.resources.InstanceResource#cancelLease

 1 @DELETE
 2 public Response cancelLease(@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
 3     boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication));
 4 
 5     if (isSuccess) {
 6         logger.debug("Found (Cancel): " + app.getName() + " - " + id);
 7         return Response.ok().build();
 8     } else {
 9         logger.info("Not Found (Cancel): " + app.getName() + " - " + id);
10         return Response.status(Status.NOT_FOUND).build();
11     }
12 }

我們看核心代碼(第3行),並找到方法的實現com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl#cancel

 1 @Override
 2 public boolean cancel(final String appName, final String id, final boolean isReplication) {
 3     if (super.cancel(appName, id, isReplication)) {
 4         replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Cancel, appName, id, null, null, isReplication);
 5         synchronized (lock) {
 6             if (this.expectedNumberOfRenewsPerMin > 0) {
 7                 // Since the client wants to cancel it, reduce the threshold (1 for 30 seconds, 2 for a minute)
 8                 this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin - 2;
 9                 this.numberOfRenewsPerMinThreshold =
10                         (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
11             }
12         }
13         return true;
14     }
15     return false;
16 }

看第三行,然后一步步跟進后找到其底層代碼:

 1 protected boolean internalCancel(String appName, String id, boolean isReplication) {
 2     try {
 3         read.lock();
 4         CANCEL.increment(isReplication);
 5         // 還是同樣的從registry中拿取數據
 6         Map<String, Lease<InstanceInfo>> gMap = registry.get(appName);
 7         Lease<InstanceInfo> leaseToCancel = null;
 8         if (gMap != null) {
 9             // 若拿到數據則移除該數據
10             leaseToCancel = gMap.remove(id);
11         }
12         synchronized (recentCanceledQueue) {
13             recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")"));
14         }
15         InstanceInfo.InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id);
16         if (instanceStatus != null) {
17             logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name());
18         }
19         if (leaseToCancel == null) {
20             CANCEL_NOT_FOUND.increment(isReplication);
21             logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id);
22             return false;
23         } else {
24             // 將此InstanceInfo移除
25             leaseToCancel.cancel();
26             InstanceInfo instanceInfo = leaseToCancel.getHolder();
27             String vip = null;
28             String svip = null;
29             if (instanceInfo != null) {
30                 instanceInfo.setActionType(InstanceInfo.ActionType.DELETED);
31                 recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(leaseToCancel));
32                 instanceInfo.setLastUpdatedTimestamp();
33                 vip = instanceInfo.getVIPAddress();
34                 svip = instanceInfo.getSecureVipAddress();
35             }
36             invalidateCache(appName, vip, svip);
37             logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication);
38             return true;
39         }
40     } finally {
41         read.unlock();
42     }
43 }

最后服務下單其實就是調用leaseToCancel.cancel(),通過更新evictionTimestamp來取消租賃

1 public void cancel() {
2     if (evictionTimestamp <= 0) {
3         evictionTimestamp = System.currentTimeMillis();
4     }
5 }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM