Apollo配置中心源碼分析
1. apollo的核心代碼分享
-
SpringApplication啟動的關鍵步驟
-
在SpringApplication中,會加載所有實現了Init方法的類
protected void applyInitializers(ConfigurableApplicationContext context) {
for (ApplicationContextInitializer initializer : getInitializers()) {
Class<?> requiredType = GenericTypeResolver.resolveTypeArgument(
initializer.getClass(), ApplicationContextInitializer.class);
Assert.isInstanceOf(requiredType, context, "Unable to call initializer.");
initializer.initialize(context);
}
}
-
通過上述步驟,Apollo自己實現的ApplicationContextInitializer也就 被加載到容器中了。具體的加載流程如下:
1.initialize-> 2.initializeSystemProperty(environment) 讀取項目中Apollo相關的配置文件,在首次讀取的時候都是為空的,配置文件還沒有加載進來;如果讀到了相關配置,就會將配置信息放到容器的環境變量中。 3.
CompositePropertySource composite = new CompositePropertySource(PropertySourcesConstants.APOLLO_BOOTSTRAP_PROPERTY_SOURCE_NAME);
//循環遍歷項目配置的namespace,
for (String namespace : namespaceList) {
//1.調用trySync(),來同步apollo的和本地緩存的配置信息
//2.將這些配置信息轉換為應用的全局property
Config config = ConfigService.getConfig(namespace);
composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
}
environment.getPropertySources().addFirst(composite);
}
2.Apollo啟動一覽
2.1 ApolloApplicationContextInitializer的作用
定義apollo的容器啟動的時候具體的工作事項
ApolloApplicationContextInitializer implements
ApplicationContextInitializer<ConfigurableApplicationContext>
容器啟動的時候調用init方法
@Override
public void initialize(ConfigurableApplicationContext context) {
ConfigurableEnvironment environment = context.getEnvironment();
------
//關鍵步驟
for (String namespace : namespaceList) {
//關鍵步驟:
Config config = ConfigService.getConfig(namespace);
/*
1.調用ConfigService.getService
public static Config getConfig(String namespace) {
return s_instance.getManager().getConfig(namespace);
}
2.DefaultConfigManager.getConfig
if (config == null) {
ConfigFactory factory = m_factoryManager.getFactory(namespace);
config = factory.create(namespace);
m_configs.put(namespace, config);
}
3.DefaultConfigFactory.create(String namespace)
DefaultConfig defaultConfig =
new DefaultConfig(namespace, createLocalConfigRepository(namespace));
4.createLocalConfigRepository-->new LocalFileConfigRepository(namespace, createRemoteConfigRepository(namespace));
5.調用 LocalFileConfigRepository的構造方法 --> RemoteConfigRepository
6.調用RemoteConfigRepository構造方法
public RemoteConfigRepository(String namespace) {
m_namespace = namespace;
m_configCache = new AtomicReference<>();
m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
m_longPollServiceDto = new AtomicReference<>();
m_remoteMessages = new AtomicReference<>();
m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
m_configNeedForceRefresh = new AtomicBoolean(true);
m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
m_configUtil.getOnErrorRetryInterval() * 8);
gson = new Gson();
this.trySync();
this.schedulePeriodicRefresh();
this.scheduleLongPollingRefresh();
}
*/
composite.addPropertySource(configPropertySourceFactory.getConfigPropertySource(namespace, config));
}
environment.getPropertySources().addFirst(composite);
}
終上,在容器啟動的時候,會調用RemoteConfigRepository的構造方法,而實現配置中心的同步主要是調用trySync,schedulePeriodicRefresh,scheduleLongPollingRefresh這個三個方法來實現配置的實時同步
2.2trySync()
protected boolean trySync() {
try {
sync();
return true;
} catch (Throwable ex) {
Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
logger
.warn("Sync config failed, will retry. Repository {}, reason: {}", this.getClass(), ExceptionUtil
.getDetailMessage(ex));
}
return false;
}
//實際調用
@Override
protected synchronized void sync() {
Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
try {
//從緩存中獲取,如果有的話,啟動的時候previos唯恐
ApolloConfig previous = m_configCache.get();
//獲取當前的配置文件
ApolloConfig current = loadApolloConfig();
//比較兩者是否有差異,
if (previous != current) {
logger.debug("Remote Config refreshed!");
//如果緩存的配置信息與當前查數據庫獲取到的信息不同,那么就將從數據庫中獲取到的配置信息放到緩存中。這樣在程序啟動的時候,configCache就完成了初始化
m_configCache.set(current);
this.fireRepositoryChange(m_namespace, this.getConfig());
}
----
}
//如果兩者有差異,就觸發此操作
protected void fireRepositoryChange(String namespace, Properties newProperties) {
for (RepositoryChangeListener listener : m_listeners) {
try {
//如果兩者有差異,那么刷新緩存配置,並且將重寫本地的緩存文件
listener.onRepositoryChange(namespace, newProperties);
} catch (Throwable ex) {
Tracer.logError(ex);
logger.error("Failed to invoke repository change listener {}", listener.getClass(), ex);
}
}
}
2.3 schedulePeriodicRefresh
開啟多線程,調用 trySync();
private void schedulePeriodicRefresh() {
logger.debug("Schedule periodic refresh with interval: {} {}",
m_configUtil.getRefreshInterval(), m_configUtil.getRefreshIntervalTimeUnit());
m_executorService.scheduleAtFixedRate(
new Runnable() {
@Override
public void run() {
Tracer.logEvent("Apollo.ConfigService", String.format("periodicRefresh: %s", m_namespace));
logger.debug("refresh config for namespace: {}", m_namespace);
trySync();
Tracer.logEvent("Apollo.Client.Version", Apollo.VERSION);
}
}, m_configUtil.getRefreshInterval(), m_configUtil.getRefreshInterval(),
m_configUtil.getRefreshIntervalTimeUnit());
}
2.4 scheduleLongPollingRefresh
private void scheduleLongPollingRefresh() {
remoteConfigLongPollService.submit(m_namespace, this);
}
public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
if (!m_longPollStarted.get()) {
startLongPolling();
}
return added;
}
整個apollo配置中心的邏輯就是這樣,簡單的說就是無線循環的去獲取配置信息,當獲取到的配置信息與上次獲取到的不同那么就刷新容器緩存的配置項並且更新客戶端緩存的配置信息。
3. 注解ApolloConfigChangeListener分析
3.1@ApolloConfigChangeListener實現原理
Apollo配置中心有聲明一個后置處理器,所以在程序啟動的時候,spring容器會自動加載這個PostProcessor。
類圖如下
/**
*
*/
public abstract class ApolloProcessor implements BeanPostProcessor, PriorityOrdered {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName)
throws BeansException {
Class clazz = bean.getClass();
for (Field field : findAllField(clazz)) {
processField(bean, beanName, field);
}
for (Method method : findAllMethod(clazz)) {
processMethod(bean, beanName, method);
}
return bean;
}
由ApolloProcessor的具體實現可以看到,在postProcessBeforeInitialization(后置處理器生成之前,會調用子類的processField、processMethod方法)。就是說在ApolloProcessor構造后置處理器之前,會調用ApolloAnnotationProcessor的processMethod
ApolloAnnotationProcessor的具體實現
@Override
protected void processMethod(final Object bean, String beanName, final Method method) {
//判斷方法上是否加上ApolloConfigChangeListener注解
ApolloConfigChangeListener annotation = AnnotationUtils
.findAnnotation(method, ApolloConfigChangeListener.class);
if (annotation == null) {
return;
}
Class<?>[] parameterTypes = method.getParameterTypes();
Preconditions.checkArgument(parameterTypes.length == 1,
"Invalid number of parameters: %s for method: %s, should be 1", parameterTypes.length,
method);
Preconditions.checkArgument(ConfigChangeEvent.class.isAssignableFrom(parameterTypes[0]),
"Invalid parameter type: %s for method: %s, should be ConfigChangeEvent", parameterTypes[0],
method);
//將 標有注解ApolloConfigChangeListener的方法設為公有的
ReflectionUtils.makeAccessible(method);
//ApolloConfigChangeListener注解上是否加上指定的namespace,如果沒有的話,默認使用的namespace為application
String[] namespaces = annotation.value();
String[] annotatedInterestedKeys = annotation.interestedKeys();
Set<String> interestedKeys = annotatedInterestedKeys.length > 0 ? Sets.newHashSet(annotatedInterestedKeys) : null;
ConfigChangeListener configChangeListener = new ConfigChangeListener() {
@Override
public void onChange(ConfigChangeEvent changeEvent) {
ReflectionUtils.invokeMethod(method, bean, changeEvent);
}
};
for (String namespace : namespaces) {
Config config = ConfigService.getConfig(namespace);
if (interestedKeys == null) {
config.addChangeListener(configChangeListener);
} else {
config.addChangeListener(configChangeListener, interestedKeys);
}
}
}
RemoteConfigLongPollService
doLongPollingRefresh
notify(lastServiceDto, response.getBody()); //通知同步更新
調用sync()比較配置文件是否發生改變,變化就同步更新
在配置文件發生變動的時候,調用順序就跟第一大節說的順序一致。
4 實際使用
4.1配置多個環境列表(一個portal管理多個環境的配置)
在啟動portal的時候需要添加參數來指定某個環境對應的注冊中心是什么。如下:
在啟動Portal的時候,當點擊的是dev也簽,調用的注冊中心是dev_meta;
-Dapollo_profile=github,auth
-Dspring.datasource.url=jdbc:mysql://yun1:3306/ApolloPortalDB?characterEncoding=utf8
-Dspring.datasource.username=root
-Dspring.datasource.password=Blue123!
-Ddev_meta=http://localhost:8080
-Dfat_meta=http://yun2:8080
-Dserver.port=8070
****在apollo中,可以支持多個環境列表的,通過閱讀源碼可以知道;在portal模塊啟動的時候,Apollo會將PortalDB庫中的ServerConfig表中的數據添加到運行變量中去,其中就有環境列表的信息,這里需要手動加上去,並且用逗號隔開,添加的值也只能是它規定的那幾個值。代碼如下:
-
獲取表中的數據並將它們設置到環境變量中
public List<Env> portalSupportedEnvs() { String[] configurations = getArrayProperty("apollo.portal.envs", new String[]{"FAT", "UAT", "PRO"}); List<Env> envs = Lists.newLinkedList(); for (String env : configurations) { envs.add(Env.fromString(env)); } return envs; }
public PortalDBPropertySource() { super("DBConfig", Maps.newConcurrentMap()); } //將PortalDB.ServerConfig中的表數據全部放入到運行變量中 @Override protected void refresh() { Iterable<ServerConfig> dbConfigs = serverConfigRepository.findAll(); for (ServerConfig config: dbConfigs) { String key = config.getKey(); Object value = config.getValue(); if (this.source.isEmpty()) { logger.info("Load config from DB : {} = {}", key, value); } else if (!Objects.equals(this.source.get(key), value)) { logger.info("Load config from DB : {} = {}. Old value = {}", key, value, this.source.get(key)); } this.source.put(key, value); } }
4.2 指定運行環境
- 1.在默認路徑 /opt/settings/server.properties中指定代碼的運行時環境。在項目啟動的時候,會找到classpath路徑下面的 apollo-env.properties,由它來指定具體的環境與注冊中心的對應關系。這樣,就不需要添加-Dapollo.mata這個變量了
MetaDomainConsts
static {
Properties prop = new Properties();
prop = ResourceUtils.readConfigFile("apollo-env.properties", prop);
Properties env = System.getProperties();
domains.put(Env.LOCAL,
env.getProperty("local_meta", prop.getProperty("local.meta", DEFAULT_META_URL)));
domains.put(Env.DEV,
env.getProperty("dev_meta", prop.getProperty("dev.meta", DEFAULT_META_URL)));
domains.put(Env.FAT,
env.getProperty("fat_meta", prop.getProperty("fat.meta", DEFAULT_META_URL)));
domains.put(Env.UAT,
env.getProperty("uat_meta", prop.getProperty("uat.meta", DEFAULT_META_URL)));
domains.put(Env.LPT,
env.getProperty("lpt_meta", prop.getProperty("lpt.meta", DEFAULT_META_URL)));
domains.put(Env.PRO,
env.getProperty("pro_meta", prop.getProperty("pro.meta", DEFAULT_META_URL)));
}