Apollo源碼閱讀筆記(二)


Apollo源碼閱讀筆記(二)

前面 分析了apollo配置設置到Spring的environment的過程,此文繼續PropertySourcesProcessor.postProcessBeanFactory里面調用的第二個方法initializeAutoUpdatePropertiesFeature(beanFactory),其實也就是配置修改后更新相關處理邏輯。

在繼續分析之前,先來看下配置是怎么自動更新的。

1. 配置更新簡單示例

通過portal頁面,修改配置之后,我們可以通過@ApolloConfigChangeListener來自己監聽配置的變化手動更新,也可以通過@Value標記字段,字段值會自動更新。

1.1 更新event相關對象

無論是哪種方式,都離不開event,所以先了解下event相關的對象

// change事件
public class ConfigChangeEvent {
  private final String m_namespace;
  private final Map<String, ConfigChange> m_changes;
  // 省略了全部方法  
}

// change實體bean
public class ConfigChange {
  private final String namespace;
  private final String propertyName;
  private String oldValue;
  private String newValue;
  private PropertyChangeType changeType;
  // 省略了全部方法  
}

// change類型
public enum PropertyChangeType {
  ADDED, MODIFIED, DELETED
}

1.2 Value注解方式

直接在屬性字段上標記@Value就可以了。

eg:

@Service
public class XXXService {
    @Value("${config.key}")
    private String XXConfig;
    ....
}

如果修改了config.key配置項,那么這里的XXConfig的值是會自動更新的。

1.3 自定義ConfigChangeListener方式

@Value方式實現起來很簡單,但如果要更靈活點,比如加上一些自己的業務處理,那就需要用到@ApolloConfigChangeListener了。

@ApolloConfigChangeListener
private void onChange(ConfigChangeEvent changeEvent) {
	logger.info("配置參數發生變化[{}]", JSON.toJSONString(changeEvent));
	doSomething();
}

標記有@ApolloConfigChangeListener的這個方法,必須帶一個ConfigChangeEvent的入參,通過這個event可以拿到事件的類型、變化的具體字段、變化前后值。

拿到event之后,我們可以根據具體的變化做不同業務處理。

以上是更新的使用,下面來深入研究下源碼的實現。

2. 配置更新監聽listener

先繼續文章開頭留下的尾巴 initializeAutoUpdatePropertiesFeature方法。

2.1 AutoUpdateConfigChangeListener

PropertySourcesProcessor.postProcessBeanFactory –> initializeAutoUpdatePropertiesFeature中。具體邏輯如下:

  1. 構造一個 AutoUpdateConfigChangeListener 對象 [implements ConfigChangeListener];

  2. 拿到前面處理的所有的ConfigPropertySource組成的list,遍歷ConfigPropertySource,設置listener

    ​ configPropertySource.addChangeListener(autoUpdateConfigChangeListener);

    ​ 這里加事件,最終是加在Config上了。

我們前面使用的@Value注解標記的字段,在字段值發生變化時,就是通過這里加的listener,收到通知的。

@Value是org.springframework.beans.factory.annotation.value,Spring的注解。apollo通過自己的SpringValueProcessor來處理它。來看下完整的流程:

  1. SpringValueProcessor繼承了ApolloProcessor,間接實現了BeanPostProcessor

  2. 在啟動的時候,會被調到 postProcessBeforeInitialization,進而調到processField

    1. 判斷字段是否被@Value標記,如果沒有則返回,有則繼續往下
    2. 解析@Value注解里面設置的value
    3. 構造SpringValue對象 new SpringValue(key, value.value(), bean, beanName, field, false)
    4. 將構造的springValue存到SpringValueRegistry的Map<BeanFactory, Multimap<String, SpringValue>>中
  3. 當修改配置發布事件后,AutoUpdateConfigChangeListener就被觸發onChange(ConfigChangeEvent changeEvent)事件

    1. 通過event拿到所有變更的keys

    2. 遍歷keys,通過springValueRegistry.get(beanFactory, key) 拿到SpringValue集合對象

      ​ 這里就是從前面的2.4的map里面獲取的

    3. 判斷配置是否真的發生了變化 shouldTriggerAutoUpdate(changeEvent, key)

    4. 遍歷SpringValue集合,逐一通過反射改變字段的值

到這里,@Value的更新流程就清楚了。

下面來看看自定義listener是怎么通知更新的。

2.2 ConfigChangeListener

更多的時候,我們是通過在方法上標記@ApolloConfigChangeListener來實現自己的監聽處理。[例子見1.3代碼]

通過@ApolloConfigChangeListener注解添加的監聽方法,默認關注的application namespace下的全部配置項。

有關該注解的處理邏輯在 com.ctrip.framework.apollo.spring.annotation.ApolloAnnotationProcessor 中,我們重點關注如下代碼段 processMethod :

protected void processMethod(final Object bean, String beanName, final Method method) {
  ApolloConfigChangeListener annotation = AnnotationUtils.findAnnotation(method, ApolloConfigChangeListener.class);
  if (annotation == null) {
    return;
  }
  Class<?>[] parameterTypes = method.getParameterTypes();
  Preconditions.checkArgument(parameterTypes.length == 1, "Invalid number of parameters: %s for method: %s, should be 1", parameterTypes.length, method);
  Preconditions.checkArgument(ConfigChangeEvent.class.isAssignableFrom(parameterTypes[0]), "Invalid parameter type: %s for method: %s, should be ConfigChangeEvent", parameterTypes[0], method);

  ReflectionUtils.makeAccessible(method);
  String[] namespaces = annotation.value();
  String[] annotatedInterestedKeys = annotation.interestedKeys();
  Set<String> interestedKeys = annotatedInterestedKeys.length > 0 ? Sets.newHashSet(annotatedInterestedKeys) : null;
  // 創建listener  
  ConfigChangeListener configChangeListener = new ConfigChangeListener() {
    @Override
    public void onChange(ConfigChangeEvent changeEvent) {
      ReflectionUtils.invokeMethod(method, bean, changeEvent);
    }
  };

  // 給config設置listener
  for (String namespace : namespaces) {
    Config config = ConfigService.getConfig(namespace);
    if (interestedKeys == null) {
      config.addChangeListener(configChangeListener);
    } else {
      config.addChangeListener(configChangeListener, interestedKeys);
    }
  }
}

經過這段代碼處理,如果有change事件,我們通過@ApolloConfigChangeListener自定義的listener就會收到消息了。

前面了解完了監聽,下面來看下事件的發布。

3. 配置更新事件發布

后台portal頁面修改發布之后,client端怎么接收到事件呢?其實在client啟動后,就會和服務端建一個長連接。代碼見 com.ctrip.framework.apollo.internals.RemoteConfigRepository 。

先來看看RemoteConfigRepository 構造方法

public RemoteConfigRepository(String namespace) {
    m_namespace = namespace;
    m_configCache = new AtomicReference<>();
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    m_httpUtil = ApolloInjector.getInstance(HttpUtil.class);
    m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
    remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
    m_longPollServiceDto = new AtomicReference<>();
    m_remoteMessages = new AtomicReference<>();
    m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
    m_configNeedForceRefresh = new AtomicBoolean(true);
    m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(),
        m_configUtil.getOnErrorRetryInterval() * 8);
    gson = new Gson();
    this.trySync();
    this.schedulePeriodicRefresh();
    this.scheduleLongPollingRefresh();
  }

從上面構造方法最后幾行可以看出,啟動的時候,會先嘗試從server端同步,然后會啟動2個定時刷新任務,一個是定時刷新,一個是長輪詢。不過不管是哪種方式,最終進入的還是 trySync() 方法。

以com.ctrip.framework.apollo.internals.RemoteConfigLongPollService為例

RemoteConfigLongPollService

—> startLongPolling()

—> doLongPollingRefresh(appId, cluster, dataCenter),發起http長連接

—> 收到http response(server端發布了新的配置)

—> notify(lastServiceDto, response.getBody());

—> remoteConfigRepository.onLongPollNotified(lastServiceDto, remoteMessages);

—> 異步調用 remoteConfigRepository.trySync();

在trySync方法中,直接調用sync()后就返回,所以需要看sync()方法內部邏輯。在sync()方法內:

  1. 先獲取本機緩存的當前配置 ApolloConfig previous = m_configCache.get()

  2. 獲取server端的最新配置 ApolloConfig current = loadApolloConfig()

  3. 如果 previous != current ,更新m_configCache

    ​ m_configCache.set(current);
    ​ this.fireRepositoryChange(m_namespace, this.getConfig());

  4. 在fireRepositoryChange里面,遍歷當前namespace下的listener,調用RepositoryChangeListener事件

    ​ listener.onRepositoryChange(namespace, newProperties)

    ​ <—這里事件就傳到LocalFileConfigRepository 這一層了

  5. 本機該namespace的LocalFileConfigRepository實現了RepositoryChangeListener,所以會受到通知調用

    在LocalFileConfigRepository的onRepositoryChange方法中:

    1. 比較newProperties.equals(m_fileProperties),相同就直接return,否則繼續往下

    2. 更新本機緩存文件 updateFileProperties

    3. 觸發 fireRepositoryChange(namespace, newProperties); <—這里事件就傳到Config這一層了

    4. 觸發事件在 DefaultConfig 中得到響應處理。這里先對發生變化的配置做一些處理,然后發ConfigChangeEvent事件

      ​ this.fireConfigChange(new ConfigChangeEvent(m_namespace, actualChanges));

    5. 在DefaultConfig的this.fireConfigChange里面,就會遍歷 listeners,依次調用onChange方法

      ​ 准確的說,是在父類AbstractConfig中實現的;

      ​ 這里的listeners就有前面提到的AutoUpdateConfigChangeListener 和 ApolloAnnotationProcessor 中定義的ConfigChangeListener

至此,事件的發布監聽就形成閉環了,這里fireConfigChange(ConfigChangeEvent)后,@Value標記的字段、@ApolloConfigChangeListener都會被觸發更新。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM