前言
上篇文章已經介紹了 Eureka Server 環境和上下文初始化的一些代碼,其中重點講解了environment初始化使用的單例模式,以及EurekaServerConfigure基於接口對外暴露配置方法的設計方式。這一講就是講解Eureka Server上下文初始化剩下的內容:Eureka Client初始化。
如若轉載 請標明來源:一枝花算不算浪漫
EurekaServer上下文構建之Client
EurekaClientConfigure創建過程
因為eurekaSever是集群部署的,所以每個eurekaServer都需要注冊到其他注冊中心節點。這里自己既是一個eurekaServer,也是一個eurekaClient。
截取EurekaServer中初始化上下文代碼:
// 3、初始化eureka-server內部的一個eureka-client(用來跟其他的eureka-server節點做注冊和通信)
// 類的開頭已經說明了:EurekaInstanceConfig其實就是eureka client相關的配置類
if (eurekaClient == null) {
EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
? new CloudInstanceConfig()
: new MyDataCenterInstanceConfig();
applicationInfoManager = new ApplicationInfoManager(
instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());
// DefaultEurekaClientConfig類似於上面的DefaultEurekaServerConfig類實現
EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
applicationInfoManager = eurekaClient.getApplicationInfoManager();
}
再看下eurekaClientConfig創建的代碼:
public DefaultEurekaClientConfig(String namespace) {
this.namespace = namespace.endsWith(".")
? namespace
: namespace + ".";
this.configInstance = Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME);
this.transportConfig = new DefaultEurekaTransportConfig(namespace, configInstance);
}
public static DynamicPropertyFactory initConfig(String configName) {
DynamicPropertyFactory configInstance = DynamicPropertyFactory.getInstance();
/**
* 獲取eureka client配置文件,類似於 {@link DefaultEurekaServerConfig}中的:
* String eurekaPropsFile = EUREKA_PROPS_FILE.get();
* private static final DynamicStringProperty EUREKA_PROPS_FILE = DynamicPropertyFactory
* .getInstance().getStringProperty("eureka.server.props","eureka-server");
*/
DynamicStringProperty EUREKA_PROPS_FILE = configInstance.getStringProperty("eureka.client.props", configName);
String env = ConfigurationManager.getConfigInstance().getString(EUREKA_ENVIRONMENT, "test");
ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, env);
String eurekaPropsFile = EUREKA_PROPS_FILE.get();
try {
ConfigurationManager.loadCascadedPropertiesFromResources(eurekaPropsFile);
} catch (IOException e) {
logger.warn(
"Cannot find the properties specified : {}. This may be okay if there are other environment "
+ "specific properties or the configuration is installed with a different mechanism.",
eurekaPropsFile);
}
return configInstance;
}
看到上面代碼想到了什么?這完全跟EurekaServerConfig創建的邏輯一樣的呀,代碼和DefaultEurekaServerConfig一致的邏輯。最后都是交給ConfigurationManager來管理。
EurekaClient創建過程
接着再來看eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
代碼:
這段代碼確實很長,我們一段段來解讀,解讀完后再看代碼:
-
基於ApplicationInfoManager(包含了服務實例的信息、配置,作為服務實例管理的一個組件),eureka client相關的配置,一起構建了一個EurekaClient。
-
這里有兩個配置:
config.shouldFetchRegistry()
和config.shouldRegisterWithEureka()
config.shouldFetchRegistry()
:
是否需要注冊到別的注冊中心。eurekaServer有個配置:eureka.client.fetchRegistry,單機情況下為false。false表示自己就是注冊中心。我的職責就是維護服務實例,並不需要去檢索服務config.shouldRegisterWithEureka()
:
是否要向別的注冊中心注冊自己。eurekaServer有個配置:eureka.client.registerWithEureka,單機情況下為false。false表示自己不需要向注冊中心注冊自己 -
創建線程池調度任務
-
創建一個心跳線程池
-
創建一個緩存刷新線程池
-
初始化線程調度任務
具體代碼如下,添加了一些代碼備注:
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
this.applicationInfoManager = applicationInfoManager;
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
// AppName是服務名稱,instanceInfo.getId就是服務實例id,類似於:ServiceA/0001
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
// 是否需要注冊到別的注冊中心。eurekaServer有個配置:eureka.client.fetchRegistry,單機情況下為false。false表示自己就是注冊中心。我的職責就是維護服務實例,並不需要去檢索服務
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
// eurekaServer有個配置:eureka.client.registerWithEureka,單機情況下為false。false表示自己不需要向注冊中心注冊自己
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
// 不需要注冊也不需要抓取 釋放不必要的資源
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
logger.info("Client configured to neither register nor query for data.");
scheduler = null;
heartbeatExecutor = null;
cacheRefreshExecutor = null;
eurekaTransport = null;
instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
return; // no need to setup up an network tasks and we are done
}
try {
// default size of 2 - 1 each for heartbeat and cacheRefresh
// 創建一個支持調度的線程池
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
// 創建一個心跳檢查的線程池,最大線程數為5
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 支持緩存刷新的線程池,最大線程數為5
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
// 支持底層的eureka client跟eureka server進行網絡通信的組件
eurekaTransport = new EurekaTransport();
// 發送http請求,調用restful接口
scheduleServerEndpointTask(eurekaTransport, args);
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
// 如果要抓取注冊表,但是抓取失敗后,需要從備份中讀取
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
// call and execute the pre registration handler before all background tasks (inc registration) is started
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
// 初始化調度任務
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
// to work with DI'd DiscoveryClient
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
initTimestampMs = System.currentTimeMillis();
logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
initTimestampMs, this.getApplications().size());
}
/**
* Initializes all scheduled tasks.
*/
private void initScheduledTasks() {
// 抓取注冊表的定時任務,
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
// registryFetchIntervalSeconds默認為30s
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
// 執行cacheRefreshExecutor調度任務,默認是30s
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
// 如果要將自己注冊到其他注冊中心
if (clientConfig.shouldRegisterWithEureka()) {
// 默認也是30s
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
// Heartbeat timer
// 執行heartbeatExecutor心跳檢查,默認是30s
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
// 創建服務副本傳播器
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
// 創建服務實例狀態變更的監聽器
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
instanceInfoReplicator.onDemandUpdate();
}
};
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 執行線程
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
總結
如果是eureka server的話,我們在玩兒spring cloud的時候,會將這個fetchRegistry給手動設置為false,如果是eureka server集群的話,就還是要保持為true。registerWithEureka也要設置為true。
(1)讀取EurekaClientConfig,包括TransportConfig
(2)保存EurekaInstanceConfig和InstanceInfo
(3)處理了是否要注冊以及抓取注冊表,如果不要的話,釋放一些資源
(4)支持調度的線程池
(5)支持心跳的線程池
(6)支持緩存刷新的線程池
(7)EurekaTransport,支持底層的eureka client跟eureka server進行網絡通信的組件,對網絡通信組件進行了一些初始化的操作
(8)如果要抓取注冊表的話,在這里就會去抓取注冊表了,但是如果說你配置了不抓取,那么這里就不抓取了
(9)初始化調度任務:如果要抓取注冊表的話,就會注冊一個定時任務,按照你設定的那個抓取的間隔,每隔一定時間(默認是30s),去執行一個CacheRefreshThread,給放那個調度線程池里去了;如果要向eureka server進行注冊的話,會搞一個定時任務,每隔一定時間發送心跳,執行一個HeartbeatThread;創建了服務實例副本傳播器,將自己作為一個定時任務進行調度;創建了服務實例的狀態變更的監聽器,如果你配置了監聽,那么就會注冊監聽器
申明
本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!
感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫