大綱
看本文之前,建議看看 apollo 的官方文檔,特別是數據庫設計文檔。
- 主流程分析
2.1 聊聊細節
2.2 loadConfig() 加載配置
2.3 auditReleases() 方法記錄此次訪問詳情
1. 主流程分析
具體代碼在 com.ctrip.framework.apollo.configservice.controller.ConfigController#queryConfig
方法中。
代碼如下:
// 這個 . 號是防止 Spring 框架去除了 . 號后面的字符,例如 xxx.json, xxx.properties
@RequestMapping(value = "/{appId}/{clusterName}/{namespace:.+}", method = RequestMethod.GET)
public ApolloConfig queryConfig(@PathVariable String appId, @PathVariable String clusterName,
@PathVariable String namespace,
@RequestParam(value = "dataCenter", required = false) String dataCenter,
@RequestParam(value = "releaseKey", defaultValue = "-1") String clientSideReleaseKey,//20180704093033-648d208dc9c1c9be
@RequestParam(value = "ip", required = false) String clientIp,
@RequestParam(value = "messages", required = false) String messagesAsString,//{"details":{"SampleApp+default+application":19}}
HttpServletRequest request, HttpServletResponse response) throws IOException {
String originalNamespace = namespace;
//strip out .properties suffix 剔除后綴
namespace = namespaceUtil.filterNamespaceName(namespace);
//fix the character case issue, such as FX.apollo <-> fx.apollo
// 改名字
namespace = namespaceUtil.normalizeNamespace(appId, namespace);
if (Strings.isNullOrEmpty(clientIp)) {
clientIp = tryToGetClientIp(request);// 獲取客戶端 IP
}
// 轉換成對象(目前沒有地方用到?)
ApolloNotificationMessages clientMessages = transformMessages(messagesAsString);
List<Release> releases = Lists.newLinkedList();
String appClusterNameLoaded = clusterName;
// appID 如果不是占位符號
if (!ConfigConsts.NO_APPID_PLACEHOLDER.equalsIgnoreCase(appId)) {
// 獲取發布信息
Release currentAppRelease = configService.loadConfig(appId, clientIp, appId, clusterName, namespace,
dataCenter, clientMessages);
if (currentAppRelease != null) {
releases.add(currentAppRelease);// 添加進集合
//we have cluster search process, so the cluster name might be overridden 這個解釋看不懂??? 集群搜索過程指的是?
appClusterNameLoaded = currentAppRelease.getClusterName();// 使用release 的集群名稱.
}
}
// 關聯類型?
//if namespace does not belong to this appId, should check if there is a public configuration
// 如果命名空間不屬於這個 appId, 那么應該檢查他是否是公共配置
if (!namespaceBelongsToAppId(appId, namespace)) {
Release publicRelease = this.findPublicConfig(appId, clientIp, clusterName, namespace,
dataCenter, clientMessages);// 獲取公共配置
if (!Objects.isNull(publicRelease)) {
releases.add(publicRelease);// 添加進集合
}
}
if (releases.isEmpty()) {// 空的話,返回 404
response.sendError(HttpServletResponse.SC_NOT_FOUND,//404
String.format(
"Could not load configurations with appId: %s, clusterName: %s, namespace: %s",
appId, clusterName, originalNamespace));
Tracer.logEvent("Apollo.Config.NotFound",
assembleKey(appId, clusterName, originalNamespace, dataCenter));
return null;
}
auditReleases(appId, clusterName, dataCenter, clientIp, releases);// 保存實例信息,(就是配置灰度規則時的ip選擇列表+ 實例列表)
// + 號拼接所有的 release key
String mergedReleaseKey = releases.stream().map(Release::getReleaseKey)
.collect(Collectors.joining(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR));
if (mergedReleaseKey.equals(clientSideReleaseKey)) {// 如果客戶端那邊的 key 和這邊的 key 一致,則 304
// Client side configuration is the same with server side, return 304
response.setStatus(HttpServletResponse.SC_NOT_MODIFIED);// 返回 304
Tracer.logEvent("Apollo.Config.NotModified",
assembleKey(appId, appClusterNameLoaded, originalNamespace, dataCenter));
return null;
}
// releaseKey 不一致,則創建 config 對象返回.
ApolloConfig apolloConfig = new ApolloConfig(appId, appClusterNameLoaded, originalNamespace,
mergedReleaseKey);
apolloConfig.setConfigurations(mergeReleaseConfigurations(releases));// 合並所有的配置, 其中,私有配置優先公共配置
Tracer.logEvent("Apollo.Config.Found", assembleKey(appId, appClusterNameLoaded,
originalNamespace, dataCenter));
return apolloConfig;
}
代碼有點長,具體細節等下慢慢聊,這里說說主要邏輯:
-
調整 namespace 的名字。獲取客戶端 IP(為了灰度)。
-
判斷 appId 是不是占位符。如果不是,就嘗試加載該 AppId 下的 Cluster 下的 namespace 的 release 配置。並添加進結果集。
-
判斷是否是公共 namespac, 假設這個 namespace 不屬於當前 AppId,那么就是公共配置,需要加載公共配置(通常就是管理的 namespace)。注意:這個時候,可能會有 2 個結果集:當前 AppId 發布的
重寫公共配置的配置
+ 公共配置。 -
如果結果集合是空,返回 404。
-
auditReleases 方法會異步的保存此次客戶端獲取配置的詳細信息到數據庫中,portal 頁面就可以看到這些信息了。
-
比較服務端的 key 和客戶端的 key 是否相同,因為每次發布配置都會有一個唯一的 key 生成,這里比較一下,就可以知道配置是否發生更改,如果相同,返回 304.
-
如果不同,構造一個 Config 對象返回給客戶端。這里有個注意的地方:
mergeReleaseConfigurations
方法會將 release 集合反轉一下,目的是讓私有的重寫配置優先於公共的配置。
目前來看,不是很復雜,主要就是根據指定的 namespace 加載配置,並和客戶端的 key 進行比較。如果不同,就返回新的配置。
2.1 聊聊細節
步驟1,2 都是處理 namespace,大小寫,后綴什么的,優先使用服務端的名稱。
轉換了 messagesAsString 為 ApolloNotificationMessages 對象,目前沒用到。
可以看到,比較重要的方法就是 configService.loadConfig 方法。這個方法是獲取配置的核心方法。下面的獲取公共配置的 findPublicConfig 方法內部也是調用的此方法。
然后還有 auditReleases 方法,這個其實就是記錄此次客戶端獲取配置的詳細信息的。
然后還有一個反轉方法。這個很簡單,大家可以自己看看。
先看看 configService.loadConfig 方法。
2.2 loadConfig() 加載配置
說方法之前,先看看 ConfigService 這個接口。最上層的是監聽器接口,用於監聽消息變化。然后是 ConfigService 接口,定義 loadConfig 方法並返回 release 對象。
最下面是具體實現類,抽象類是用了模板模式,定義了獲取配置的骨架,下面則有 2 個實現類,一個基於緩存,一個基於 DB。默認是 DB。具體使用哪個類要看 server_config 表里的配置,配置的 key 是 config-service.cache.enabled
,value 要么 true 要么 false。
注意,使用緩存很耗費內存,小心 OOM 哦。
看看抽象類里 loadConfig 方法的實現:
@Override
public Release loadConfig(String clientAppId, String clientIp, String configAppId, String configClusterName,
String configNamespace, String dataCenter, ApolloNotificationMessages clientMessages) {
// load from specified cluster fist 如果不是默認的 cluster(私有或者灰度)
if (!Objects.equals(ConfigConsts.CLUSTER_NAME_DEFAULT, configClusterName)) {
Release clusterRelease = findRelease(clientAppId, clientIp, configAppId, configClusterName, configNamespace,
clientMessages);
if (!Objects.isNull(clusterRelease)) {
return clusterRelease;
}
}
// try to load via data center 試圖通過數據中心獲取
if (!Strings.isNullOrEmpty(dataCenter) && !Objects.equals(dataCenter, configClusterName)) {
Release dataCenterRelease = findRelease(clientAppId, clientIp, configAppId, dataCenter, configNamespace,
clientMessages);
if (!Objects.isNull(dataCenterRelease)) {
return dataCenterRelease;
}
}
// fallback to default release 從默認的 cluster 獲取
return findRelease(clientAppId, clientIp, configAppId, ConfigConsts.CLUSTER_NAME_DEFAULT, configNamespace,
clientMessages);
}
步驟:首先加載私有/灰度的 cluster,這個就是客戶端配置文件里的 apollo.cluster
配置, 然后再加載 server.properties
配置文件的 idc 屬性,最后加載默認的。
他們的優先級如圖:
細心的你可以發現,3 個 if 判斷力都是調用的 findRelease 方法,只是第四個參數不同,這個參數就是 configClusterName —— 不同的 cluster。
這個方法首先或加載灰度的,然后再加載普通的。所以,客戶端的 IP 就顯得重要了。
private Release findRelease(String clientAppId, String clientIp, String configAppId, String configClusterName,
String configNamespace, ApolloNotificationMessages clientMessages) {
// 獲取灰度release id, 從應用的緩存中獲取規則.
Long grayReleaseId = grayReleaseRulesHolder.findReleaseIdFromGrayReleaseRule(clientAppId, clientIp, configAppId,
configClusterName, configNamespace);
Release release = null;
if (grayReleaseId != null) {// 如果有灰度
// return releaseRepository.findByIdAndIsAbandonedFalse(releaseId);// 沒有放有回滾的發布
release = findActiveOne(grayReleaseId, clientMessages); // 獲取灰度 release
}
if (release == null) {// 如果沒有發布的新灰度
// 獲取最新的普通的發布活動
release = findLatestActiveRelease(configAppId, configClusterName, configNamespace, clientMessages);
}
// 灰度 --> 最新的
return release;
}
步驟:調用grayReleaseRulesHolder
的findReleaseIdFromGrayReleaseRule
方法獲取灰度發布 ID。當然,這也是個緩存。
如果有灰度,則根據 id 獲取對應的 release 信息,得到配置,release 里面包含了全量的配置信息(從數據庫獲取)。
如果沒有灰度,則獲取最新的普通的發布信息(從數據庫獲取)。
關鍵在於獲取灰度 id。
看看這個方法:
public Long findReleaseIdFromGrayReleaseRule(String clientAppId, String clientIp, String
configAppId, String configCluster, String configNamespaceName) {
String key = assembleGrayReleaseRuleKey(configAppId, configCluster, configNamespaceName);// 組裝 key
if (!grayReleaseRuleCache.containsKey(key)) {// 從緩存中獲取, 緩存是個 handler 監聽器 + 定時任務
return null;
}
//create a new list to avoid ConcurrentModificationException 如果存在,就處理
List<GrayReleaseRuleCache> rules = Lists.newArrayList(grayReleaseRuleCache.get(key));
for (GrayReleaseRuleCache rule : rules) {
//check branch status 必須是激活的狀態
if (rule.getBranchStatus() != NamespaceBranchStatus.ACTIVE) {
continue;
}// 如果匹配上了 ip 和 客戶端的 appId, 就返回
if (rule.matches(clientAppId, clientIp)) {
return rule.getReleaseId();
}
}
return null;
}
步驟:首先用 + 號將 appId,cluster,namespace 拼接,再從緩存中獲取,獲取的是該 key 對應的灰度規則。
而緩存由一個定時任務更新(60s) + 監聽器更新。
如果存在,就循環比較規則,如果規則是激活狀態,且 appId 和 ip 和當前客戶端匹配,那么就返回這個灰度的 release Id。
這個就是得到灰度 release Id 的具體邏輯,可以看到,這里是優先加載灰度的。
那么這個定時任務 + 監聽器具體是怎么樣的呢?
剛剛說了,定時任務是 60 s 一次,相對於配置中心來說,及時性肯定是不夠的,所以,他更多的是一種補償措施,即監聽器失效了,定時任務能夠保證 60s 內配置是最新的。
而監聽器才是最新的配置。具體方法則是 handleMessage 方法。這個方法會得到一個發布消息,包含 appId + clusterName + namespace,有了這個信息,就可以得到 release 信息了。
每當發布一個配置 ,或回滾一個配置,都會發送一個消息到數據庫,ConfigService 會掃描得到這個消息,然后通知所有的監聽器。執行監聽器的 handlerMessage 方法。
在灰度規則監聽器中,會檢查灰度發布規則表,根據消息的內容(appId + cluster + namespace 組成的唯一 namespace key
)並進行處理,處理的邏輯則是更新緩存中的規則內容。
總結一下這個 loadConfig 方法:
這是 ConfigService 接口定義的方法,由 一個抽象類和 2 個派生類組成,默認使用 DB 模式的派生類,抽象類定義了 loadConfig 的方法骨架,利用模板模式,2 個子類可以根據自己的特性返回數據 —— release。loadConfig 里,在獲取 release 的時候,會有一個查找順序,首先找私有/灰度的 cluster,然后找 idc(這個一般公司用不到,攜程內部的特性),最后找默認的。而他們調用的 findRelease 方法內部也會有一個查找順序:首先根據灰度規則查找灰度發布 ID,如果沒有,查找默認的最新發布 —— 也就是灰度的規則比默認的規則高。
2.3 auditReleases() 方法記錄此次訪問詳情
這個方法主要是調用 instanceConfigAuditUtil 的 audit 方法:
private void auditReleases(String appId, String cluster, String dataCenter, String clientIp,
List<Release> releases) {
if (Strings.isNullOrEmpty(clientIp)) {
//no need to audit instance config when there is no ip
return;
}
for (Release release : releases) {
instanceConfigAuditUtil.audit(appId, cluster, dataCenter, clientIp, release.getAppId(),
release.getClusterName(),
release.getNamespaceName(), release.getReleaseKey());
}
}
注意:這里的判斷,如果沒有 ip, 就不記錄了。這里用的是 aduit 審計的概念,我想就是類似記錄吧,方便后面進行復盤,查看啥的。
而這個 audit 方法具體的內容則是:構造一個 InstanceConfigAuditModel 對象放到一個阻塞隊列中,由另一個線程異步處理。
public boolean audit(String appId, String clusterName, String dataCenter, String
ip, String configAppId, String configClusterName, String configNamespace, String releaseKey) {
return this.audits.offer(new InstanceConfigAuditModel(appId, clusterName, dataCenter, ip,
configAppId, configClusterName, configNamespace, releaseKey));
}
// 長度為 10000 的阻塞隊列
private BlockingQueue<InstanceConfigAuditModel> audits = Queues.newLinkedBlockingQueue
(10000);
那么異步執行的內容是怎么樣的呢?
當然要看隊列 poll 或者 take 后做什么了.
@Override
public void afterPropertiesSet() throws Exception {
auditExecutorService.submit(() -> {
while (!auditStopped.get() && !Thread.currentThread().isInterrupted()) {
try {
InstanceConfigAuditModel model = audits.poll();
if (model == null) {
TimeUnit.SECONDS.sleep(1);
continue;
}
doAudit(model);
} catch (Throwable ex) {
Tracer.logError(ex);
}
}
});
}
該類實現了 Spring 的 InitializingBean 接口,重寫了 afterPropertiesSet 方法,這個方法會在屬性注入完畢后執行。
方法其實就是提交了一個任務,任務內容則是從隊列中取出對象,然后執行 doAudit 方法。如果取出是空,休眠 1 秒。
這個方法的主要內容就是更新客戶端訪問信息,或者創建客戶端訪問信息。使用 2 個緩存,存儲 instanceId 和 releaseKey,這個是為了提高校驗數據的性能。
而這個實例在數據庫是這樣的:
從 Instance 表結構看,記錄是每台機器最新訪問的記錄,而 InstanceConfig 則是記錄的此次訪問的具體 namespace 的 發布信息。
對應的是控制台的實例列表:
關於這個方法的具體內容我就不貼了,感興趣的可以自己看看,主要內容就是記錄 Instance 的訪問信息用於后台審計查看。
總結
好了,apollo 客戶端訪問 ConfigService 獲取配置的大概思路和具體細節就介紹完了。這里總結一下。
-
獲取配置的時候,可能會有 2 個結果集(關聯類型),那么會將私有的優先(放到前面)。如果集合是空,返回 404 ,如果沒有新的發布信息,返回 304.
-
當服務器加載配置信息的時候,有幾個順序,特別是集群的順序:私有/灰度 cluster (apollo.cluster)----> 數據中心(server.properties 的 idc)-----> 默認的 cluster。同時,加載集群內部配置的時候,也會優先加載灰度的配置(根據 IP),然后才是默認的配置。
-
最后,會記錄此次訪問的信息,方便后台審計。如果是 10 分鍾之內訪問的,即不會更新。