一、Eureka的基础架构及服务治理机制
Eureka服务治理的基础架构包含三个核心:服务注册中心、服务提供者、服务消费者。其中服务注册中心,即Eureka提供的服务端,提供服务注册和发现的功能;服务提供者,即将自己的服务注册到注册中心;服务的消费者,从注册中心获取服务列表,从而使消费者知道到何处调用服务,服务消费可以使用Ribbon、Feign等。
1、服务提供者:
(1)服务注册:服务提供者在项目启动时,会通过发送REST请求的方式将自己注册到eureka server上,同时带上一些自己的元数据,Eureka Server收到请求后,将元数据存储在一个双层map中,第一层的key是服务名称,第二层的key是具体的服务实例。
(2)服务同步:如果A项目将服务注册到了M注册中心,B项目将服务注册到N注册中心,但是如果M项目和N项目开启了可以注册为服务的配置,那么当A项目将服务注册到M注册中心时,M注册中心会将请求转发到N注册中心,以保证两个注册中心副本中服务同步。
(3)服务续约:在注册完服务后,服务提供者会维护一个心跳来持续告诉注册中心其还活着,以防止注册中心的剔除任务将该服务实例从服务列表中删除。
关于心跳频率与剔除任务认为服务失效时间的配置参数如下所示(配置值均为默认值):
eureka:
instance:
# 心跳检测频率
lease-renewal-interval-in-seconds: 30 # 服务失效时间 lease-expiration-duration-in-seconds: 90
2、服务消费者:
(1)获取服务:当启动服务消费者项目时,会向注册中心发送一个REST请求来获取注册中心上注册的服务清单。为了性能的考虑,注册中心自己维护了一份只读的注册服务清单,每30秒更新一次,要调整注册中心中注册服务清单更新频率,可以使用如下参数进行设置(下面示例为默认值),同时,由于获取服务是服务消费的基础,因此需要保证eureka.client.fetch-registry为true
eureka:
client:
registry-fetch-interval-seconds: 30 fetch-registry: true
(2)服务调用:服务消费者在获取到服务提供清单后,会根据服务名获得具体的实例名和该实例的元数据,然后客户端可以根据自己需要,选择调用哪个实例,在上述代码样例中,我们使用的是Ribbon来做负载均衡,而ribbon默认采用轮询的方式进行调用,从而实现客户端的负载。对于访问实例的选择,Eureka中有Region和Zone的概念,一个Region中可以包含多个Zone,一个客户端会被注册到一个Zone中,所以一个客户端只对应一个Zone和一个Region,在服务调用时,优先访问处于同一个Zone中的服务提供者,若访问不到,再访问其他Zone中的服务提供者。
(3)服务下线:当客户端实例进行正常的关闭操作时,它会触发一个服务下线的REST请求给注册中心,告诉注册中心其要下线,注册中心收到请求后,将该服务状态置为下线,并把该事件传播出去。
3、服务注册中心
(1)失效剔除:有时服务实例并不会正常下线,可能是由于内存溢出、网络故障等原因使得服务不能正常运行,所以注册中心并未收到服务下线的请求。为了剔除该类不可用服务提供者实例,Eureka Server在启动时,会创建一个定时任务,每隔一段时间(默认60秒)将当前清单中超时(默认90秒)没有续约的服务剔除出去。
(2)自我保护:前面提到过,服务提供者启动后,会维护一个心跳,定时向注册中心发送心跳,告诉注册中心自己还活着。注册中心的运行期间,会统计心跳失败的比例在15分钟内是否低于85%,如果低于85%,注册中心会将该服务的实例保护起来,不让其过期,但是由于在本地测试,所以这个情况非常容易满足(而线上则主要是由于网络不稳定等导致),这就导致在保护期间内,如果服务提供者实例出现问题,那么客户端就会拿到有问题的实例,将会出现调用失败的情况,因此客户端必须要有容错机制,比如说请求重试、断路器等机制。如果我们想关闭自我保护机制,可以使用如下参数。
eureka:
server:
enable-self-preservation: false
在我们没有关闭自我保护之前,当我们在之前访问注册中心时:http://localhost:1112/,会看到红色警告(警告内容如下图所示),这就是触发了Eureka Server的自我保护机制。
二、Eureka源码分析
1、服务注册中心的加载
首先从服务提供者开始看,例如eureka-client项目,我们主要是在主函数上添加了@EnableDiscoveryClient注解,以及在配置文件中添加了注册中心地址等配置信息,那么源码入口就可以从@EnableDiscoveryClient注解开始看,从该注解我们可以了解,主要是开启DiscoveryClient,那么全局搜索DiscoveryClient,可以发现有两个,一个是springCloud提供的接口org.springframework.cloud.client.discovery.DiscoveryClient,另外一个是netflix的实现com.netflix.discovery.DiscoveryClient,可以看下类的依赖关系,SpringCloud提供的接口类依赖关系及 netflix的实现类的类依赖关系分别如以下左右两张图:
而org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient又使用了com.netflix.discovery.EurekaClient,所以,总的依赖关系如下:
org.springframework.cloud.client.discovery.DiscoveryClient提供了Springcloud服务注册相关的接口,而org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是netflix公司对于该接口的实现,而该实现,是包装了netflix公司开源项目中的com.netflix.discovery.EurekaClient接口及实现com.netflix.discovery.DiscoveryClient。
可以看调用链,先调用了com.netflix.discovery.DiscoveryClient#getEurekaServiceUrlsFromConfig方法
/**
* @deprecated use {@link #getServiceUrlsFromConfig(String, boolean)} instead.
*/
@Deprecated
public static List<String> getEurekaServiceUrlsFromConfig(String instanceZone, boolean preferSameZone) { return EndpointUtils.getServiceUrlsFromConfig(staticClientConfig, instanceZone, preferSameZone); }
可以看到该方法已经过期,被使用@link到了替代方法com.netflix.discovery.DiscoveryClient#getServiceUrlsFromConfig
@Deprecated
@Override
public List<String> getServiceUrlsFromConfig(String instanceZone, boolean preferSameZone) { return EndpointUtils.getServiceUrlsFromConfig(clientConfig, instanceZone, preferSameZone); }
然后,调用到了com.netflix.discovery.endpoint.EndpointUtils#getServiceUrlsFromConfig方法
public static List<String> getServiceUrlsFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) {
List<String> orderedUrls = new ArrayList<String>(); String region = getRegion(clientConfig); String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); if (availZones == null || availZones.length == 0) { availZones = new String[1]; availZones[0] = DEFAULT_ZONE; } logger.debug("The availability zone for the given region {} are {}", region, availZones); int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[myZoneOffset]); if (serviceUrls != null) { orderedUrls.addAll(serviceUrls); } int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1); while (currentOffset != myZoneOffset) { serviceUrls = clientConfig.getEurekaServerServiceUrls(availZones[currentOffset]); if (serviceUrls != null) { orderedUrls.addAll(serviceUrls); } if (currentOffset == (availZones.length - 1)) { currentOffset = 0; } else { currentOffset++; } } if (orderedUrls.size() < 1) { throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!"); } return orderedUrls; }
该方法依次操作:获取项目对应的Region、获取项目对应的Zones数组(如果没有,则取默认Zone)、计算项目对应Zones数组的偏移量、获取注册中心、计算当前偏移量、根据当前偏移量获取注册中心地址并将地址添加在地址集合中、返回地址集合
接下来每个步骤单独说明。
(1)获取项目对应的Region
public static String getRegion(EurekaClientConfig clientConfig) {
String region = clientConfig.getRegion(); if (region == null) { region = DEFAULT_REGION; } region = region.trim().toLowerCase(); return region; }
这个没什么可说的,就是从配置文件中获取Region的配置,如果没有,则取默认值,最终将Region转换成大写返回,这里可以使用如下参数设置Region
eureka:
client:
region: test-1
(2)获取项目对应的Zones数组
public String[] getAvailabilityZones(String region) {
String value = this.availabilityZones.get(region); if (value == null) { value = DEFAULT_ZONE; } return value.split(","); }
根据region获取配置文件中zone数组,如果没有配置,则取默认值(上面使用的eureka.client.service-url.defaultZone即是默认配置),若要指定Zone,可以使用如下配置:
eureka:
client:
#client所在zone为availabilityZones的第一个zone,如果未配置,则为defaultZone
prefer-same-zone-eureka: true region: region1 availability-zones: region1: zone1,zone2,zone3 region2: zone4,zone5,zone6 service-url: zone1: http://localhost:1111/eureka/ zone2: http://localhost:1111/eureka/ zone3: http://localhost:1111/eureka/ zone4: http://localhost:1112/eureka/ zone5: http://localhost:1112/eureka/ zone6: http://localhost:1112/eureka/
(3)获取注册中心,计算zone数组下标,获得zone,然后根据zone获取注册中心
public List<String> getEurekaServerServiceUrls(String myZone) {
String serviceUrls = this.serviceUrl.get(myZone); if (serviceUrls == null || serviceUrls.isEmpty()) { serviceUrls = this.serviceUrl.get(DEFAULT_ZONE); } if (!StringUtils.isEmpty(serviceUrls)) { final String[] serviceUrlsSplit = StringUtils .commaDelimitedListToStringArray(serviceUrls); List<String> eurekaServiceUrls = new ArrayList<>(serviceUrlsSplit.length); for (String eurekaServiceUrl : serviceUrlsSplit) { if (!endsWithSlash(eurekaServiceUrl)) { eurekaServiceUrl += "/"; } eurekaServiceUrls.add(eurekaServiceUrl.trim()); } return eurekaServiceUrls; } return new ArrayList<>(); }
获取zone配置的注册中心,如果没有,则取默认的注册中心,然后使用逗号切割后组装成集合返回。
当我们在使用Ribbon来实现服务调用时,对于zone的设置可以实现区域亲和性,Ribbon会优先访问属于同一Zone中的服务实例,只有当同一zone中没有可用实例后,才会访问其他zone中的实例。所以通过zone属性的定义,配合实际部署的物理结构,我们就可以有效的设计出针对区域性故障的容错集群。
2、服务注册
在com.netflix.discovery.DiscoveryClient构造函数中调用了com.netflix.discovery.DiscoveryClient#initScheduledTasks方法
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); cacheRefreshTask = new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ); scheduler.schedule( cacheRefreshTask, registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer heartbeatTask = new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ); scheduler.schedule( heartbeatTask, renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
对于该方法,依次错了如下操作:
判断是否开启服务获取,如果开启,创建一个定时任务,定时刷新服务列表
判断是否开启服务注册,如果开启,添加一个服务续租定时任务;异步注册服务。
接下来我们一一查看操作:
(1)判断是否开启服务注册,如果开启,添加一个服务续租定时任务;异步注册服务。
这个判断使用的参数是上述的eureka.client.register-with-eureka参数,如果配置为true,创建了一个服务续租的定时任务,还创建了一个异步注册的任务。
a、先看异步注册服务类InstanceInfoReplicator,该类实现了Rnnable接口,因此直接看run方法
public void run() {
try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
这里注册的,是discoveryClient.register();这一行代码,查看register方法
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
可以看到,这里使用了http请求将元数据InstanceInfo请求到注册中心。
b、然后看服务续租定时任务
该任务的执行类是HeartbeatThread,直接看run方法,润方法中调用了com.netflix.discovery.DiscoveryClient#renew方法
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
可以发现该方法,也是使用http的请求,将appname等信息发送给注册中心。
(2)判断是否开启服务获取,如果开启,创建一个定时任务,定时刷新服务列表
这个判断对应上述使用的参数eureka.client.fetch-registry,如果开启则创建一个定时任务,该定时任务的执行频率等都是使用参数配置的(参数内容不再单独说明,看属性名称即可判断配置参数名称),然后主要看调用的实现类CacheRefreshThread,该实现类的run方法调用了com.netflix.discovery.DiscoveryClient#refreshRegistry方法:
@VisibleForTesting
void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else { logger.info("Remote regions to fetch modified concurrently," + " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions); } } } else { // Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) { StringBuilder allAppsHashCodes = new StringBuilder(); allAppsHashCodes.append("Local region apps hashcode: "); allAppsHashCodes.append(localRegionApps.get().getAppsHashCode()); allAppsHashCodes.append(", is fetching remote regions? "); allAppsHashCodes.append(isFetchingRemoteRegionRegistries); for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) { allAppsHashCodes.append(", Remote region: "); allAppsHashCodes.append(entry.getKey()); allAppsHashCodes.append(" , apps hashcode: "); allAppsHashCodes.append(entry.getValue().getAppsHashCode()); } logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ", allAppsHashCodes); } } catch (Throwable e) { logger.error("Cannot fetch registry from server", e); } }
前面的一大堆都是校验,后面的一大堆都是输出,唯一有用的是:boolean success = fetchRegistry(remoteRegionsModified);里面会根据是否是第一发起服务获取请求做不同的请求处理。
3、注册中心
Eureka server对于各类rest请求的定义都位于com.netflix.eureka.resources包下,以服务注册方法为例:com.netflix.eureka.resources.ApplicationResource#addInstance
@POST
@Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
上面一通校验,对业务逻辑有影响的只有registry.register(info, "true".equals(isReplication));最终调用到org.springframework.cloud.netflix.eureka.server.InstanceRegistry#register(com.netflix.appinfo.InstanceInfo, boolean)方法
public void register(final InstanceInfo info, final boolean isReplication) {
handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
super.register(info, isReplication);
}
可以看到,先调用了handleRegistration方法将注册事件通知出去,然后调用了父类的register方法将服务注册。