Apollo 7 — ConfigService 消息掃描設計實現


目錄

  1. 設計
  2. 代碼實現
  3. 總結

1.設計

Apollo 為了減少依賴,將本來 MQ 的職責轉移到了 Mysql 中。具體表現為 Mysql 中的 ReleaseMessage 表。

具體官方文檔可見:發送ReleaseMessage的實現方式

用張圖簡單的來表示一下 :

有人肯定要問了,為什么 Admin Service 和 Config Service 不放在一起呢?我曾提過 issue 問過作者,大概的答案是:兩者職責不同,部署的實例數也不同,通常 Admin 會少一些,因為只服務於 Portal,而 Config 則要部署的多一些,因為需要服務於 Client。

第二則是兩者的開發節奏也是不一樣,Config Service 的更新會影響客戶端,而 Admin 的更新則是影響 Portal,所以,分開他們,對服務的部署可以更加細粒度。

關於這個 issue:為什么 admin 和 config 不合在一起呢

高可用的話,可參考下圖或鏈接 可用性考慮:

扯遠了。

回到我們之前說的,之所以要使用 Mysql,是因為要減少依賴,為了替代 MQ,而之所以要使用 MQ,是為了解耦 Config 和 Admin,而之所以要使用 Config 和 Admin,則是因為設計,部署,開發節奏等原因。

那么,基於 Mysql 的消息的實現是怎么弄的呢?

上面是 apollo 文檔對於發送ReleaseMessage的實現方式的描述。

我們來看看代碼實現.

2. 代碼實現

com.ctrip.framework.apollo.biz.message 包下,有關於消息的實現,

麻雀雖小,五臟俱全。

  1. MessageSender 接口定義了一個方法:void sendMessage(String message, String channel);
    這個 channel 就是 topic 了。 message 就是消息的具體內容了。

  2. 目前只有一個實現類 DatabaseMessageSender, 基於數據庫的消息發送。就是把消息保存到 ReleaseMessage 表中。

  3. Topics 定義消息主題,目前只有一個:apollo-release

  4. ReleaseMessageListener 消息監聽器接口,定義了一個方法:void handleMessage(ReleaseMessage message, String channel),需要實現此方法並注冊到掃描器中,當有新的消息時,便會通知監聽器。

  5. ReleaseMessageScanner 消息掃描器,用於掃描數據庫的 ReleaseMessage 表,如果有新數據,則通知監聽器。

目前監聽器有以下實現:

從左到右:

  1. ReleaseMessageServiceWithCache,ReleaseMessage 的緩存,用於長輪詢判斷是否以后新的消息。
  2. GrayReleaseRulesHolder,灰度規則變化監聽器。
  3. ConfigService,分為緩存和默認,默認的 handleMessage 方法什么都不做,緩存實現則會對 cache 熱身。
  4. NotificationControllerV2 監聽器,當客戶端被長連接 Hold 住時,消息如果更新則喚醒客戶端立即返回,保證及時性。
  5. NotificationController 已廢棄,不談了。

所以,每當 ReleaseMessageScanner 得到新的消息,都會觸發這些監聽器。這些監聽器在哪里被添加的呢?

位置:com.ctrip.framework.apollo.configservice.ConfigServiceAutoConfiguration.java

ReleaseMessageScanner 的 afterPropertiesSet 方法會啟動一個間隔 1 秒定時任務,執行 scanMessages 方法。

具體方法如下:

private boolean scanAndSendMessages() {
    //current batch is 500 批處理 500 條
    // 根據 maxIdScanned 找到比這個 id 大的 500 條數據,
    List<ReleaseMessage> releaseMessages =
        releaseMessageRepository.findFirst500ByIdGreaterThanOrderByIdAsc(maxIdScanned);
    if (CollectionUtils.isEmpty(releaseMessages)) {
      return false;
    }
    // 開始通知 handleMessage 監聽器
    fireMessageScanned(releaseMessages);
    int messageScanned = releaseMessages.size();// 消息數量
    maxIdScanned = releaseMessages.get(messageScanned - 1).getId();// 更新最大 id
    
    return messageScanned == 500;// 如果不足 500, 說明沒有新消息了
  }

方法很簡單,首先根據最大的掃描 Id 找到 500 條消息,對這 500 條消息進行批處理,觸發監聽器。最后更新最大掃描 Id。如果此次取出的數據量超過 500 條,則認為還有數據,就繼續處理。

很明顯,每次處理這么多消息肯定很耗時,那么假設處理這些消息要 5 秒,那么定時任務的間隔是多少了呢?答:6 秒。因為定時任務的模式是 scheduleWithFixedDelay 模式,固定的間隔,以當前任務結束時間 + period 時間作為下一個任務的開始時間。

那么,admin 什么時候會向數據庫發送消息(保存 ReleaseMessage)呢?

那么就要查看 sendMessage 方法被哪些地方調用就好了。

  1. NamespaceBranchController
    1.1 更新灰度規則 updateBranchGrayRules
    1.2 刪除灰度分支 deleteBranch

  2. NamespaceService
    2.1 刪除命名空間 NamespaceController#deleteNamespace
    2.2 刪除集群 ClusterController#delete
    2.3 刪除灰度 NamespaceBranchController#deleteBranch
    2.4 刪除命名空間分支NamespaceService#deleteNamespace

  3. ReleaseController
    3.1 主/灰版本發布 publish
    3.2 全量發布 updateAndPublish
    3.3 回滾 rollback

這些地方被觸發的時候,都會發送消息到數據庫。因此,可能會重復觸發,導致 ConfigService 重復消費。。。例如在放棄灰度和全量發布的時候,就會重復發送消息。

筆者就這個問題提了 issue,期待官方 fix 這個 bug。

3. 總結

本文重點分析了 apollo 關系消息這塊的設計: 每當有 app, cluster,namespace, Release 等操作的時候,都會發送消息到數據庫,ConfigService 會定時掃描數據庫,有新消息了,就會立即通知各個監聽器,確保配置實時推送到客戶端。

同時,我們也發現了一個 bug,就是會重復發送消息,引起重復消費。

迫於種種限制,apollo 使用了這種設計,獲取,如果可以使用 MQ 的話,一切會更加的簡單。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM