Spring Cloud Eureka(六):Eureka Client 如何注冊到Eureka Server


1、本節概要

根據前文我們對Eureka Server 有了一定的了解,本節我們主要學習Eureka Client 與 Eureka Server 如何通訊的及相關通信機制是什么,本文會弄清楚一下幾個問題:

  • @EnableDiscoveryClient 和 @EnableEurekaClient的區別

  • Eureka Client 啟動時做了什么事情(初始化工作)

  • Eureka Client 怎么注冊到 Eureka Server(服務注冊)

  • 怎么獲取 Eureka Server 上的服務的(服務獲取)

  • 怎么保證 Eureka Client 本地的服務列表與Eureka Server 上的服務列表保持一致的(服務同步)

2、@EnableDiscoveryClient 和@EnableEurekaClient 的區別

當我們使用服務發現的注解時,卻發現了兩種注解,經過嘗試發現兩者的使用效果是一樣的,那么他們的區別是什么呢,通過查詢官方文檔是這樣解釋的:

Spring Cloud Commons provides the @EnableDiscoveryClient annotation. This looks for implementations of the DiscoveryClient interface with META-INF/spring.factories. Implementations of the Discovery Client add a configuration class to spring.factories under the org.springframework.cloud.client.discovery.EnableDiscoveryClient key. Examples of DiscoveryClient implementations include Spring Cloud Netflix Eureka, Spring Cloud Consul Discovery, and Spring Cloud Zookeeper Discovery.
    
Spring Cloud Commons 提供 @EnableDiscoveryClient 注解。這將使用META-INF / spring.factories查找DiscoveryClient接口的實現。Discovery Client的實現將配置類添加到org.springframework.cloud.client.discovery.EnableDiscoveryClient項下的spring.factories。比如 DiscoveryClient 實現包括Spring Cloud Netflix Eureka,Spring Cloud Consul Discovery和Spring Cloud Zookeeper Discovery。
By default, implementations of DiscoveryClient auto-register the local Spring Boot server with the remote discovery server. This behavior can be disabled by setting autoRegister=false in @EnableDiscoveryClient.
    
默認情況下,DiscoveryClient 的實現會使用遠程發現服務器自動注冊本地Spring Boot服務器。可以通過在@EnableDiscoveryClient中設置autoRegister = false來禁用此行為。

簡而言之 spring cloud 中服務發現有多種實現(eureka、consul、zookeeper等等),@EnableDiscoveryClient 基於 spring-cloud-commons, @EnableEurekaClient 基於 spring-cloud-netflix。

如果選用的注冊中心是eureka,那么就推薦@EnableEurekaClient,如果是其他的注冊中心,那么推薦使用@EnableDiscoveryClient。

我們看一下這兩個注解的源碼:

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

	/**
	 * 如果為true,本服務獎自動注冊到指定的服務中心,默認是啟用的,
     可以通過設置為 false 來禁用自動注冊
	 */
	boolean autoRegister() default true;
}

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface EnableEurekaClient {

}

3、Eureka Client 啟動時做了什么

我們根據注解 @EnableEurekaClient 找到該注解所在工程 spring-cloud-netflix-eureka-client-2.0.1.RELEASE.jar 下的 META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration

本文主要看 org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,其它的主要是客戶端查找配置中心及客戶端負載均衡(Ribbon)的引導服務,在后續文章的學習總結中再做介紹。

下面我們就從 EurekaClientAutoConfiguration 的源碼着手,看看都有哪些操作,先看一下該配置類的注解有哪些

@Configuration
# 加載配置文件解析
@EnableConfigurationProperties 
# 加載client配置項
@ConditionalOnClass(EurekaClientConfig.class)
# 加載 DiscoveryClientOptionalArgsConfiguration 到容器
@Import(DiscoveryClientOptionalArgsConfiguration.class)
# 客戶端啟用引導標記
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)
# 啟用eureka client,默認啟用
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
# 當前配置類EurekaClientAutoConfiguration 加載完畢后的后續加載操作
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
		CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
		"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
		"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {
}

3.1 @ConditionalOnClass(EurekaClientConfig.class)

@ImplementedBy(DefaultEurekaClientConfig.class)
public interface EurekaClientConfig {}

查看默認實現 DefaultEurekaClientConfig,eureka client 默認值加載

@Singleton
@ProvidedBy(DefaultEurekaClientConfigProvider.class)
public class DefaultEurekaClientConfig implements EurekaClientConfig {
    /** @deprecated */
    @Deprecated
    public static final String DEFAULT_NAMESPACE = "eureka.";
    public static final String DEFAULT_ZONE = "defaultZone";
    private final String namespace;
    private final DynamicPropertyFactory configInstance;
    private final EurekaTransportConfig transportConfig;

    public DefaultEurekaClientConfig() {
        this("eureka");
    }

    public DefaultEurekaClientConfig(String namespace) {
        this.namespace = namespace.endsWith(".") ? namespace : namespace + ".";
        this.configInstance = Archaius1Utils.initConfig("eureka-client");
        this.transportConfig = new DefaultEurekaTransportConfig(namespace, this.configInstance);
    }

    public int getRegistryFetchIntervalSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "client.refresh.interval", 30).get();
    }

    public int getInstanceInfoReplicationIntervalSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "appinfo.replicate.interval", 30).get();
    }

    public int getInitialInstanceInfoReplicationIntervalSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "appinfo.initial.replicate.time", 40).get();
    }

    public int getEurekaServiceUrlPollIntervalSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "serviceUrlPollIntervalMs", 300000).get() / 1000;
    }

    public String getProxyHost() {
        return this.configInstance.getStringProperty(this.namespace + "eurekaServer.proxyHost", (String)null).get();
    }

........................eureka client 默認值加載............................

}

3.2 @ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)

@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
public class EurekaDiscoveryClientConfiguration {

	class Marker {}

	@Bean
	public Marker eurekaDiscoverClientMarker() {
		return new Marker();
	}

    # 具體配置刷新事件的監聽器
	@Configuration
	@ConditionalOnClass(RefreshScopeRefreshedEvent.class)
	protected static class EurekaClientConfigurationRefresher {

		@Autowired(required = false)
		private EurekaClient eurekaClient;

		@Autowired(required = false)
		private EurekaAutoServiceRegistration autoRegistration;

		@EventListener(RefreshScopeRefreshedEvent.class)
		public void onApplicationEvent(RefreshScopeRefreshedEvent event) {
			//This will force the creation of the EurkaClient bean if not already created
			//to make sure the client will be reregistered after a refresh event
			if(eurekaClient != null) {
				eurekaClient.getApplications();
			}
			if (autoRegistration != null) {
				// register in case meta data changed
				this.autoRegistration.stop();
				this.autoRegistration.start();
			}
		}
	}

# 健康檢查配置
	@Configuration
	@ConditionalOnProperty(value = "eureka.client.healthcheck.enabled", matchIfMissing = false)
	protected static class EurekaHealthCheckHandlerConfiguration {

		@Autowired(required = false)
		private HealthAggregator healthAggregator = new OrderedHealthAggregator();

		@Bean
		@ConditionalOnMissingBean(HealthCheckHandler.class)
		public EurekaHealthCheckHandler eurekaHealthCheckHandler() {
			return new EurekaHealthCheckHandler(this.healthAggregator);
		}
	}
}

該配置文件主要做了兩件事情,一個是監聽RefreshScopeRefreshedEvent事件,配置文件動態刷新時觸發。另一個是配置健康檢查處理程序。

3.3、 EurekaClientAutoConfiguration

根據源碼可以看到這里創建了EurekaClientConfigBean、EurekaInstanceConfigBean兩個基本配置,以及EurekaServiceRegistry

@Bean
		@ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
		public ApplicationInfoManager eurekaApplicationInfoManager(
				EurekaInstanceConfig config) {
			InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
			return new ApplicationInfoManager(config, instanceInfo);
		}
  • InstanceInfo

使用EurekaInstanceConfig,通過new InstanceInfoFactory().create(config)創建

  • ApplicationInfoManager

使用InstanceInfo以及EurekaInstanceConfig創建:new ApplicationInfoManager(config, instanceInfo)

  • EurekaClient
@Bean(destroyMethod = "shutdown")
		@ConditionalOnMissingBean(value = EurekaClient.class, search = SearchStrategy.CURRENT)
		public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
			return new CloudEurekaClient(manager, config, this.optionalArgs,
					this.context);
		}

使用ApplicationInfoManager、EurekaClientConfig創建:new CloudEurekaClient(manager, config, this.optionalArgs,this.context)

  • DiscoveryClient
@Bean
	public DiscoveryClient discoveryClient(EurekaInstanceConfig config, EurekaClient client) {
		return new EurekaDiscoveryClient(config, client);
	}

通過EurekaInstanceConfig、EurekaClient創建:new EurekaDiscoveryClient(config, client)

  • EurekaRegistration
@Bean
		@ConditionalOnBean(AutoServiceRegistrationProperties.class)
		@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
		public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient,
													 CloudEurekaInstanceConfig instanceConfig,
													 ApplicationInfoManager applicationInfoManager,
													 @Autowired(required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
			return EurekaRegistration.builder(instanceConfig)
					.with(applicationInfoManager)
					.with(eurekaClient)
					.with(healthCheckHandler)
					.build();
		}

通過EurekaClient、CloudEurekaInstanceConfig、ApplicationInfoManager來創建

  • EurekaAutoServiceRegistration
@Bean
	@ConditionalOnBean(AutoServiceRegistrationProperties.class)
	@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
	public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry registry,
																	   EurekaRegistration registration) {
		return new EurekaAutoServiceRegistration(context, registry, registration);
	}

通過EurekaServiceRegistry、EurekaRegistration來創建

3.4、 EurekaAutoServiceRegistration

該類實現了接口 SmartLifecycle 的方法。SmartLifecycle 是一個接口。當Spring容器加載所有bean並完成初始化之后,會接着回調實現該接口的類中對應的方法。

public class EurekaAutoServiceRegistration implements AutoServiceRegistration, SmartLifecycle, Ordered {

	private static final Log log = LogFactory.getLog(EurekaAutoServiceRegistration.class);

	private AtomicBoolean running = new AtomicBoolean(false);

	private int order = 0;

	private AtomicInteger port = new AtomicInteger(0);

	private ApplicationContext context;

	private EurekaServiceRegistry serviceRegistry;

	private EurekaRegistration registration;

	public EurekaAutoServiceRegistration(ApplicationContext context, EurekaServiceRegistry serviceRegistry, EurekaRegistration registration) {
		this.context = context;
		this.serviceRegistry = serviceRegistry;
		this.registration = registration;
	}

	@Override
	public void start() {
	    ..................
	}
	@Override
	public void stop() {
		this.serviceRegistry.deregister(this.registration);
		this.running.set(false);
	}

	@Override
	public boolean isRunning() {
		return this.running.get();
	}

	@Override
	public int getPhase() {
		return 0;
	}

	@Override
	public boolean isAutoStartup() {
		return true;
	}

	@Override
	public void stop(Runnable callback) {
		stop();
		callback.run();
	}

	@Override
	public int getOrder() {
		return this.order;
	}

	@EventListener(WebServerInitializedEvent.class)
	public void onApplicationEvent(WebServerInitializedEvent event) {
		..................
	}

	@EventListener(ContextClosedEvent.class)
	public void onApplicationEvent(ContextClosedEvent event) {
		if( event.getApplicationContext() == context ) {
			stop();
		}
	}

}
3.4.1 把自身應用實例的信息注冊到eureka server中(start)
@Override
	public void start() {
		// 設置端口號
		if (this.port.get() != 0) {
			if (this.registration.getNonSecurePort() == 0) {
				this.registration.setNonSecurePort(this.port.get());
			}

			if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
				this.registration.setSecurePort(this.port.get());
			}
		}

		// 判斷端口號是否被占用
		if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
//注冊自身實例服務到注冊中心
			this.serviceRegistry.register(this.registration);
//觸發服務注冊時間
			this.context.publishEvent(
					new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
			this.running.set(true);
		}
	}
3.4.2 服務主動下線 stop()
@Override
	public void stop() {
		this.serviceRegistry.deregister(this.registration);
		this.running.set(false);
	}

調用this.serviceRegistry.deregister(this.registration)方法,告知eureka server自身服務要下線

3.5、 EurekaServiceRegistry

根據源碼可知 EurekaAutoServiceRegistration 中的 start()、stop() 方法分別調用了 EurekaServiceRegistry 的 register()、deregister() 方法實現了服務注冊和服務主動下線。下面我們就看一下這兩個方法的實現。

3.5.1 服務注冊 register

調用ApplicationInfoManager的setInstanceStatus方法來更改狀態

@Override
	public void register(EurekaRegistration reg) {
		maybeInitializeClient(reg);

		if (log.isInfoEnabled()) {
			log.info("Registering application " + reg.getInstanceConfig().getAppname()
					+ " with eureka with status "
					+ reg.getInstanceConfig().getInitialStatus());
		}

		reg.getApplicationInfoManager()
				.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());

		reg.getHealthCheckHandler().ifAvailable(healthCheckHandler ->
				reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
	}
	
	private void maybeInitializeClient(EurekaRegistration reg) {
		// force initialization of possibly scoped proxies
		reg.getApplicationInfoManager().getInfo();
		reg.getEurekaClient().getApplications();
	}
	
3.5.2 服務自動下線 deregister

調用ApplicationInfoManager的setInstanceStatus方法來將狀態設置為InstanceInfo.InstanceStatus.DOWN

@Override
	public void deregister(EurekaRegistration reg) {
		if (reg.getApplicationInfoManager().getInfo() != null) {

			if (log.isInfoEnabled()) {
				log.info("Unregistering application " + reg.getInstanceConfig().getAppname()
						+ " with eureka with status DOWN");
			}

			reg.getApplicationInfoManager().setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);

			//shutdown of eureka client should happen with EurekaRegistration.close()
			//auto registration will create a bean which will be properly disposed
			//manual registrations will need to call close()
		}
	}

3.6、 ApplicationInfoManager

設置服務實例狀態並發布StatusChangeEvent事件,通知所有該事件監聽者

/**
     * 設置此實例的狀態。 應用程序可以使用它來標識它是否已准備好接收流量。 在此處設置狀態還會通知所有已注冊的監聽器狀態更改事件。
     */
    public synchronized void setInstanceStatus(InstanceStatus status) {
    //當前服務實例狀態
        InstanceStatus next = instanceStatusMapper.map(status);
        if (next == null) {
            return;
        }
//當前服務實例之前的狀態
        InstanceStatus prev = instanceInfo.setStatus(next);
        if (prev != null) {
            for (StatusChangeListener listener : listeners.values()) {
                try {
                //狀態變化事件通知所有監聽者
                    listener.notify(new StatusChangeEvent(prev, next));
                } catch (Exception e) {
                    logger.warn("failed to notify listener: {}", listener.getId(), e);
                }
            }
        }
    }

3.7、DiscoveryClient -> initScheduledTasks()

這里注冊了StatusChangeListener,之后觸發instanceInfoReplicator.onDemandUpdate()

 /**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // 緩存刷新定時器
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // 心跳定時器
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
            //狀態變更監聽器
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

3.8、InstanceInfoReplicator -> onDemandUpdate

這里的onDemandUpdate()方法主要是執行InstanceInfoReplicator.this.run() 而這個run方法主要是判斷是否dirty,如果是則調用discoveryClient.register()

 public boolean onDemandUpdate() {
        if (rateLimiter.acquire(burstSize, allowedRatePerMinute)) {
            if (!scheduler.isShutdown()) {
                scheduler.submit(new Runnable() {
                    @Override
                    public void run() {
                        logger.debug("Executing on-demand update of local InstanceInfo");
    
                        Future latestPeriodic = scheduledPeriodicRef.get();
                        if (latestPeriodic != null && !latestPeriodic.isDone()) {
                            logger.debug("Canceling the latest scheduled update, it will be rescheduled at the end of on demand update");
                            latestPeriodic.cancel(false);
                        }
    
                        InstanceInfoReplicator.this.run();
                    }
                });
                return true;
            } else {
                logger.warn("Ignoring onDemand update due to stopped scheduler");
                return false;
            }
        } else {
            logger.warn("Ignoring onDemand update due to rate limiter");
            return false;
        }
    }
    
    public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

3.9、DiscoveryClient -> register

最后終於找到了服務注冊的具體真正實現,register() 才是真正去與遠程的Eureka Server交互,注冊服務的操作

 /**
     * 通過REST調用來注冊eureka服務。
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

最后總結一下:

EurekaClientAutoConfiguration 構造了 EurekaClientConfigBean、EurekaInstanceConfigBean 以及 EurekaServiceRegistry,接着在這幾個對象的基礎上進一步構建了 ApplicationInfoManager、CloudEurekaClient等。

其中ApplicationInfoManager 負責變更實例狀態並發布 StatusChangeEvent事件,而CloudEurekaClient繼承了com.netflix.discovery.DiscoveryClient 包含了statusChangeListener 用於響應S tatusChangeEvent,最后觸發的是DiscoveryClient.register方法,與遠程的Eureka Server通信,同步實例狀態。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM