實時數據訂閱與分發系統可以將業務數據源變更實時分發分發到消息總線上,並維護消息的統一格式,提供通用的客戶端框架供消息生產者與下游業務接入。
一般能用於以下場景:
-
索引構建:MySQL到ES
-
緩存管理:MySQL到Redis或本地cache
-
數據庫鏡像
-
實時備份
-
價格變化等重要業務信息訂閱
實時數據訂閱與分發系統一般都有如下幾個核心模塊構成:
- Change Data Capture(變更數據抓取,CDC):負責實時抓取業務數據源的變更消息;
- 消息中間件:支撐消息的分發與堆積;
- Client:為生產者與消費者提供統一的接入途徑,解決序列化、offset管理、監控報警等共性問題。
本文以Databus(
https://github.com/linkedin/databus)為例,來說明一個實時數據訂閱與分發系統的基本構成與原理。
1. 簡介
Databus 是一個實時的低延遲數據抓取系統, 它抓取業務數據源的實時變更, 並發送到中繼(Databus Relay), 下游業務從中繼獲得變更數據進行業務處理:
根據Linkdin的介紹, Databus有以下特性:
-
來源獨立:Databus支持多種數據來源的變更抓取,包括Oracle和MySQL。
-
可擴展、高可用:Databus能擴展到支持數千消費者和事務數據來源,同時保持高度可用性。
-
事務按序提交:Databus能保持來源數據庫中的事務完整性,並按照事務分組和來源的提交順序交付變更事件。
-
低延遲:數據源變更完成后,Databus能在微秒級內將事務提交給消費者。
-
無限回溯:Databus對消費者支持無限回溯能力。當消費者需要產生數據的完整拷貝時(比如新的搜索索引), 直接進行一次全量回溯即可。
2. 系統設計
Databus的結構與工作流如下圖:
-
通過CDC訂閱數據庫變更
-
將變更消息放入Relay的緩存隊列
-
各個client對隊列中的消息進行消費
我們可以看到,核心組件為五個部分:
1)DatabusEventProducer
負責實時數據抓取CDC, 針對MySQL數據源, 開源方案提供了基於OpenReplicator(一個Binlog解析框架)的方案。
2)SchemaRegistry
注冊DatabusEvent對應的Schema, 所有DatabusEvent需要按Schema進行序列化, 並在消息中保持Schema信息。
3)DatabusRelay
基於Netty實現的一個Server, 內部維護高性能的緩存消息隊列RingBuffer,作為訂閱消息的內存消息中間件,保證了消息的有序性。
4)BootstrapService
BootStrapService是特殊的DatabusClient, 它將來自DatabusRelay中的所有數據寫入MySQL, 當客戶端需要無限回溯時, 便請求BootstrapService拉取歷史數據。
有很多系統是將消息直接投遞到kafka或者rocketMQ,就能同時實現了DatabusRelays和BootstrapService的功能。
5)ClientLib:
ClientLib就是消費客戶端Client,用來實時接收變更消息。其中封裝了一些數據抓取細節, 比如當回溯的SCN(System Change Number)在中繼上不存在時自動請求BootstrapService, 回溯完成后切回中繼。
3. 核心模塊淺析
DatabusRelay
DatabusRelay模塊可類比為基於內存實現的消息隊列, 下面是DatabusRelay的結構圖:
我們可以看到,DatabusRelay運行於Netty容器中。
同時,它會啟動一系列EventProducer, 從數據源或其他Relays拉取實時增量數據並寫入EventBuffers。
EventBuffers由多RingBuffer組成, RingBuffer通過mmap進行寫盤持久化。這種設計下,使得EventProducer與DatabusRelay在同一個Netty容器中, 避免了rpc調用,效率更高。
所有的增量數據, 都有一個System Change Number(SCN), 這個SCN由EventProducer產生, 保證全局遞增, DatabusRelay需要記錄每個RingBuffer目前的MaxSCN(類似Kafka的offset), 並使用MaxSCN Reader/Writer進行持久化。持久化方式是本地文件存儲。
DatabusClient
DatabusClient用於消費來自DatabusRelay的數據, 它作為一個lib提供給需要接入的服務。下面是官方給出的DatabusClient架構圖:
客戶端代碼以回調形式注冊到DatabusClient上, 並聲明自己關心的資源。
啟動后, Client通過讀取當前checkpoint, 假如checkpoint在Relay中不存在, 那么啟動Relay Puller 和 Bootstrap Puller分別從Relay和Bootstrap Service拉取數據, 並寫入本地EventBuffer, Dispatcher不斷poll EventBuffer中的數據, 分發到Callback Driver上, 並通知Checkpoint Persistence Provider記錄當前讀取的checkpoint(即SCN)。
這樣就能實現對訂閱消息的全量回溯, 向客戶端代碼屏蔽Relay與Boostrap Service的差異。
4. 擴展性
在上面的DataBus Relay的架構圖可以看到
Event Producer除了可以訂閱數據源之外,還能訂閱其他Relays,可以通過Relay Chaining進行擴展。在Follower Relay中使用RelayEventProducer, 從Master Relay拉取數據, 這兩個Relay就組成了Master和Follower的鏈式結構。當然,這種設計會使得變更數據在多個Relay中冗余,有些浪費空間。
都看到最后了,原創不易,點個關注,點個贊吧~
知識碎片重新梳理,構建Java知識圖譜: github.com/saigu/JavaK…(歷史文章查閱非常方便)