【一起學源碼-微服務】Nexflix Eureka 源碼三:EurekaServer啟動之EurekaServer上下文EurekaClient創建


前言

上篇文章已經介紹了 Eureka Server 環境和上下文初始化的一些代碼,其中重點講解了environment初始化使用的單例模式,以及EurekaServerConfigure基於接口對外暴露配置方法的設計方式。這一講就是講解Eureka Server上下文初始化剩下的內容:Eureka Client初始化。

如若轉載 請標明來源:一枝花算不算浪漫

EurekaServer上下文構建之Client

EurekaClientConfigure創建過程

因為eurekaSever是集群部署的,所以每個eurekaServer都需要注冊到其他注冊中心節點。這里自己既是一個eurekaServer,也是一個eurekaClient。

截取EurekaServer中初始化上下文代碼:

// 3、初始化eureka-server內部的一個eureka-client(用來跟其他的eureka-server節點做注冊和通信)
// 類的開頭已經說明了:EurekaInstanceConfig其實就是eureka client相關的配置類
if (eurekaClient == null) {
    EurekaInstanceConfig instanceConfig = isCloud(ConfigurationManager.getDeploymentContext())
            ? new CloudInstanceConfig()
            : new MyDataCenterInstanceConfig();
    
    applicationInfoManager = new ApplicationInfoManager(
            instanceConfig, new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get());

    // DefaultEurekaClientConfig類似於上面的DefaultEurekaServerConfig類實現
    EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
    eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
} else {
    applicationInfoManager = eurekaClient.getApplicationInfoManager();
}

再看下eurekaClientConfig創建的代碼:

public DefaultEurekaClientConfig(String namespace) {
    this.namespace = namespace.endsWith(".")
            ? namespace
            : namespace + ".";

    this.configInstance = Archaius1Utils.initConfig(CommonConstants.CONFIG_FILE_NAME);
    this.transportConfig = new DefaultEurekaTransportConfig(namespace, configInstance);
}

public static DynamicPropertyFactory initConfig(String configName) {

    DynamicPropertyFactory configInstance = DynamicPropertyFactory.getInstance();
	/**
	 * 獲取eureka client配置文件,類似於 {@link DefaultEurekaServerConfig}中的:
	 * String eurekaPropsFile = EUREKA_PROPS_FILE.get();
	 * private static final DynamicStringProperty EUREKA_PROPS_FILE = DynamicPropertyFactory
	 *             .getInstance().getStringProperty("eureka.server.props","eureka-server");
	 */
	DynamicStringProperty EUREKA_PROPS_FILE = configInstance.getStringProperty("eureka.client.props", configName);

    String env = ConfigurationManager.getConfigInstance().getString(EUREKA_ENVIRONMENT, "test");
    ConfigurationManager.getConfigInstance().setProperty(ARCHAIUS_DEPLOYMENT_ENVIRONMENT, env);

    String eurekaPropsFile = EUREKA_PROPS_FILE.get();
    try {
        ConfigurationManager.loadCascadedPropertiesFromResources(eurekaPropsFile);
    } catch (IOException e) {
        logger.warn(
                "Cannot find the properties specified : {}. This may be okay if there are other environment "
                        + "specific properties or the configuration is installed with a different mechanism.",
                eurekaPropsFile);

    }

    return configInstance;
}

看到上面代碼想到了什么?這完全跟EurekaServerConfig創建的邏輯一樣的呀,代碼和DefaultEurekaServerConfig一致的邏輯。最后都是交給ConfigurationManager來管理。

EurekaClient創建過程

接着再來看eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);代碼:

這段代碼確實很長,我們一段段來解讀,解讀完后再看代碼:

  1. 基於ApplicationInfoManager(包含了服務實例的信息、配置,作為服務實例管理的一個組件),eureka client相關的配置,一起構建了一個EurekaClient。

  2. 這里有兩個配置:config.shouldFetchRegistry()config.shouldRegisterWithEureka()

    config.shouldFetchRegistry()
    是否需要注冊到別的注冊中心。eurekaServer有個配置:eureka.client.fetchRegistry,單機情況下為false。false表示自己就是注冊中心。我的職責就是維護服務實例,並不需要去檢索服務

    config.shouldRegisterWithEureka()
    是否要向別的注冊中心注冊自己。eurekaServer有個配置:eureka.client.registerWithEureka,單機情況下為false。false表示自己不需要向注冊中心注冊自己

  3. 創建線程池調度任務

  4. 創建一個心跳線程池

  5. 創建一個緩存刷新線程池

  6. 初始化線程調度任務

具體代碼如下,添加了一些代碼備注:

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
				Provider<BackupRegistry> backupRegistryProvider) {
	if (args != null) {
		this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
		this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
		this.eventListeners.addAll(args.getEventListeners());
		this.preRegistrationHandler = args.preRegistrationHandler;
	} else {
		this.healthCheckCallbackProvider = null;
		this.healthCheckHandlerProvider = null;
		this.preRegistrationHandler = null;
	}

	this.applicationInfoManager = applicationInfoManager;
	InstanceInfo myInfo = applicationInfoManager.getInfo();

	clientConfig = config;
	staticClientConfig = clientConfig;
	transportConfig = config.getTransportConfig();
	instanceInfo = myInfo;
	if (myInfo != null) {
		// AppName是服務名稱,instanceInfo.getId就是服務實例id,類似於:ServiceA/0001
		appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
	} else {
		logger.warn("Setting instanceInfo to a passed in null value");
	}

	this.backupRegistryProvider = backupRegistryProvider;

	this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
	localRegionApps.set(new Applications());

	fetchRegistryGeneration = new AtomicLong(0);

	remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
	remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

	// 是否需要注冊到別的注冊中心。eurekaServer有個配置:eureka.client.fetchRegistry,單機情況下為false。false表示自己就是注冊中心。我的職責就是維護服務實例,並不需要去檢索服務
	if (config.shouldFetchRegistry()) {
		this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
	} else {
		this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
	}
	// eurekaServer有個配置:eureka.client.registerWithEureka,單機情況下為false。false表示自己不需要向注冊中心注冊自己
	if (config.shouldRegisterWithEureka()) {
		this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
	} else {
		this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
	}

	logger.info("Initializing Eureka in region {}", clientConfig.getRegion());

	// 不需要注冊也不需要抓取 釋放不必要的資源
	if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
		logger.info("Client configured to neither register nor query for data.");
		scheduler = null;
		heartbeatExecutor = null;
		cacheRefreshExecutor = null;
		eurekaTransport = null;
		instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

		// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
		// to work with DI'd DiscoveryClient
		DiscoveryManager.getInstance().setDiscoveryClient(this);
		DiscoveryManager.getInstance().setEurekaClientConfig(config);

		initTimestampMs = System.currentTimeMillis();
		logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
				initTimestampMs, this.getApplications().size());

		return;  // no need to setup up an network tasks and we are done
	}

	try {
		// default size of 2 - 1 each for heartbeat and cacheRefresh
		// 創建一個支持調度的線程池
		scheduler = Executors.newScheduledThreadPool(2,
				new ThreadFactoryBuilder()
						.setNameFormat("DiscoveryClient-%d")
						.setDaemon(true)
						.build());

		// 創建一個心跳檢查的線程池,最大線程數為5
		heartbeatExecutor = new ThreadPoolExecutor(
				1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>(),
				new ThreadFactoryBuilder()
						.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
						.setDaemon(true)
						.build()
		);  // use direct handoff

		// 支持緩存刷新的線程池,最大線程數為5
		cacheRefreshExecutor = new ThreadPoolExecutor(
				1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
				new SynchronousQueue<Runnable>(),
				new ThreadFactoryBuilder()
						.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
						.setDaemon(true)
						.build()
		);  // use direct handoff

		// 支持底層的eureka client跟eureka server進行網絡通信的組件
		eurekaTransport = new EurekaTransport();
		// 發送http請求,調用restful接口
		scheduleServerEndpointTask(eurekaTransport, args);

		AzToRegionMapper azToRegionMapper;
		if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
			azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
		} else {
			azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
		}
		if (null != remoteRegionsToFetch.get()) {
			azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
		}
		instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
	} catch (Throwable e) {
		throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
	}

	// 如果要抓取注冊表,但是抓取失敗后,需要從備份中讀取
	if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
		fetchRegistryFromBackup();
	}

	// call and execute the pre registration handler before all background tasks (inc registration) is started
	if (this.preRegistrationHandler != null) {
		this.preRegistrationHandler.beforeRegistration();
	}
	// 初始化調度任務
	initScheduledTasks();

	try {
		Monitors.registerObject(this);
	} catch (Throwable e) {
		logger.warn("Cannot register timers", e);
	}

	// This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
	// to work with DI'd DiscoveryClient
	DiscoveryManager.getInstance().setDiscoveryClient(this);
	DiscoveryManager.getInstance().setEurekaClientConfig(config);

	initTimestampMs = System.currentTimeMillis();
	logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
			initTimestampMs, this.getApplications().size());
}
/**
 * Initializes all scheduled tasks.
 */
private void initScheduledTasks() {
	// 抓取注冊表的定時任務,
    if (clientConfig.shouldFetchRegistry()) {
        // registry cache refresh timer
		// registryFetchIntervalSeconds默認為30s
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        // 執行cacheRefreshExecutor調度任務,默認是30s
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    // 如果要將自己注冊到其他注冊中心
    if (clientConfig.shouldRegisterWithEureka()) {
    	//  默認也是30s
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

        // Heartbeat timer
		// 執行heartbeatExecutor心跳檢查,默認是30s
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        // InstanceInfo replicator
		// 創建服務副本傳播器
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize

		// 創建服務實例狀態變更的監聽器
        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                        InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }

        // 執行線程
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

總結

如果是eureka server的話,我們在玩兒spring cloud的時候,會將這個fetchRegistry給手動設置為false,如果是eureka server集群的話,就還是要保持為true。registerWithEureka也要設置為true。

(1)讀取EurekaClientConfig,包括TransportConfig
(2)保存EurekaInstanceConfig和InstanceInfo
(3)處理了是否要注冊以及抓取注冊表,如果不要的話,釋放一些資源
(4)支持調度的線程池
(5)支持心跳的線程池
(6)支持緩存刷新的線程池
(7)EurekaTransport,支持底層的eureka client跟eureka server進行網絡通信的組件,對網絡通信組件進行了一些初始化的操作
(8)如果要抓取注冊表的話,在這里就會去抓取注冊表了,但是如果說你配置了不抓取,那么這里就不抓取了
(9)初始化調度任務:如果要抓取注冊表的話,就會注冊一個定時任務,按照你設定的那個抓取的間隔,每隔一定時間(默認是30s),去執行一個CacheRefreshThread,給放那個調度線程池里去了;如果要向eureka server進行注冊的話,會搞一個定時任務,每隔一定時間發送心跳,執行一個HeartbeatThread;創建了服務實例副本傳播器,將自己作為一個定時任務進行調度;創建了服務實例的狀態變更的監聽器,如果你配置了監聽,那么就會注冊監聽器

申明

本文章首發自本人博客:https://www.cnblogs.com/wang-meng 和公眾號:壹枝花算不算浪漫,如若轉載請標明來源!

感興趣的小伙伴可關注個人公眾號:壹枝花算不算浪漫

22.jpg


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM