目錄
- 設計
- 代碼實現
- 總結
1.設計
Apollo 為了減少依賴,將本來 MQ 的職責轉移到了 Mysql 中。具體表現為 Mysql 中的 ReleaseMessage 表。
具體官方文檔可見:發送ReleaseMessage的實現方式
用張圖簡單的來表示一下 :
有人肯定要問了,為什么 Admin Service 和 Config Service 不放在一起呢?我曾提過 issue 問過作者,大概的答案是:兩者職責不同,部署的實例數也不同,通常 Admin 會少一些,因為只服務於 Portal,而 Config 則要部署的多一些,因為需要服務於 Client。
第二則是兩者的開發節奏也是不一樣,Config Service 的更新會影響客戶端,而 Admin 的更新則是影響 Portal,所以,分開他們,對服務的部署可以更加細粒度。
關於這個 issue:為什么 admin 和 config 不合在一起呢
高可用的話,可參考下圖或鏈接 可用性考慮:
扯遠了。
回到我們之前說的,之所以要使用 Mysql,是因為要減少依賴,為了替代 MQ,而之所以要使用 MQ,是為了解耦 Config 和 Admin,而之所以要使用 Config 和 Admin,則是因為設計,部署,開發節奏等原因。
那么,基於 Mysql 的消息的實現是怎么弄的呢?
上面是 apollo 文檔對於發送ReleaseMessage的實現方式
的描述。
我們來看看代碼實現.
2. 代碼實現
在 com.ctrip.framework.apollo.biz.message
包下,有關於消息的實現,
麻雀雖小,五臟俱全。
-
MessageSender 接口定義了一個方法:
void sendMessage(String message, String channel);
這個 channel 就是 topic 了。 message 就是消息的具體內容了。 -
目前只有一個實現類
DatabaseMessageSender
, 基於數據庫的消息發送。就是把消息保存到 ReleaseMessage 表中。 -
Topics 定義消息主題,目前只有一個:
apollo-release
。 -
ReleaseMessageListener 消息監聽器接口,定義了一個方法:
void handleMessage(ReleaseMessage message, String channel)
,需要實現此方法並注冊到掃描器中,當有新的消息時,便會通知監聽器。 -
ReleaseMessageScanner 消息掃描器,用於掃描數據庫的 ReleaseMessage 表,如果有新數據,則通知監聽器。
目前監聽器有以下實現:
從左到右:
- ReleaseMessageServiceWithCache,ReleaseMessage 的緩存,用於長輪詢判斷是否以后新的消息。
- GrayReleaseRulesHolder,灰度規則變化監聽器。
- ConfigService,分為緩存和默認,默認的 handleMessage 方法什么都不做,緩存實現則會對 cache 熱身。
- NotificationControllerV2 監聽器,當客戶端被長連接 Hold 住時,消息如果更新則喚醒客戶端立即返回,保證及時性。
- NotificationController 已廢棄,不談了。
所以,每當 ReleaseMessageScanner 得到新的消息,都會觸發這些監聽器。這些監聽器在哪里被添加的呢?
位置:com.ctrip.framework.apollo.configservice.ConfigServiceAutoConfiguration.java
ReleaseMessageScanner 的 afterPropertiesSet 方法會啟動一個間隔 1 秒定時任務,執行 scanMessages 方法。
具體方法如下:
private boolean scanAndSendMessages() {
//current batch is 500 批處理 500 條
// 根據 maxIdScanned 找到比這個 id 大的 500 條數據,
List<ReleaseMessage> releaseMessages =
releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
if (CollectionUtils.isEmpty(releaseMessages)) {
return false;
}
// 開始通知 handleMessage 監聽器
fireMessageScanned(releaseMessages);
int messageScanned = releaseMessages.size();// 消息數量
maxIdScanned = releaseMessages.get(messageScanned - 1).getId();// 更新最大 id
return messageScanned == 500;// 如果不足 500, 說明沒有新消息了
}
方法很簡單,首先根據最大的掃描 Id 找到 500 條消息,對這 500 條消息進行批處理,觸發監聽器。最后更新最大掃描 Id。如果此次取出的數據量超過 500 條,則認為還有數據,就繼續處理。
很明顯,每次處理這么多消息肯定很耗時,那么假設處理這些消息要 5 秒,那么定時任務的間隔是多少了呢?答:6 秒。因為定時任務的模式是 scheduleWithFixedDelay 模式,固定的間隔,以當前任務結束時間 + period 時間作為下一個任務的開始時間。
那么,admin 什么時候會向數據庫發送消息(保存 ReleaseMessage)呢?
那么就要查看 sendMessage 方法被哪些地方調用就好了。
-
NamespaceBranchController
1.1 更新灰度規則updateBranchGrayRules
1.2 刪除灰度分支deleteBranch
-
NamespaceService
2.1 刪除命名空間NamespaceController#deleteNamespace
2.2 刪除集群ClusterController#delete
2.3 刪除灰度NamespaceBranchController#deleteBranch
2.4 刪除命名空間分支NamespaceService#deleteNamespace
-
ReleaseController
3.1 主/灰版本發布publish
3.2 全量發布updateAndPublish
3.3 回滾rollback
這些地方被觸發的時候,都會發送消息到數據庫。因此,可能會重復觸發,導致 ConfigService 重復消費。。。例如在放棄灰度和全量發布的時候,就會重復發送消息。
筆者就這個問題提了 issue,期待官方 fix 這個 bug。
3. 總結
本文重點分析了 apollo 關系消息這塊的設計: 每當有 app, cluster,namespace, Release 等操作的時候,都會發送消息到數據庫,ConfigService 會定時掃描數據庫,有新消息了,就會立即通知各個監聽器,確保配置實時推送到客戶端。
同時,我們也發現了一個 bug,就是會重復發送消息,引起重復消費。
迫於種種限制,apollo 使用了這種設計,獲取,如果可以使用 MQ 的話,一切會更加的簡單。