數據同步工具之FlinkCDC/Canal/Debezium對比


前言

數據准實時復制(CDC)是目前行內實時數據需求大量使用的技術,隨着國產化的需求,我們也逐步考慮基於開源產品進行准實時數據同步工具的相關開發,逐步實現對商業產品的替代。本文把市面上常見的幾種開源產品,Canal、Debezium、Flink CDC 從原理和適用做了對比,供大家參考。

Debezium

Debezium is an open source distributed platform for change data capture. Start it up, point it at your databases, and your apps can start responding to all of the inserts, updates, and deletes that other apps commit to your databases. Debezium is durable and fast, so your apps can respond quickly and never miss an event, even when things go wrong.

Debezium是一種CDC(Change Data Capture)工具,工作原理類似大家所熟知的Canal, DataBus, Maxwell等,是通過抽取數據庫日志來獲取變更。

Debezium最初設計成一個Kafka Connect 的Source Plugin,目前開發者雖致力於將其與Kafka Connect解耦,但當前的代碼實現還未變動。下圖引自Debeizum官方文檔,可以看到一個Debezium在一個完整CDC系統中的位置。

Kafka Connect 為Source Plugin提供了一系列的編程接口,最主要的就是要實現SourceTask的poll方法,其返回List 將會被以最少一次語義的方式投遞至Kafka。

Debezium MySQL 架構

Debezium抽取原理

Reader體系構成了MySQL模塊中代碼的主線,我們的分析從Reader開始。

Reader繼承關系

從名字上應該可以看出,真正主要的是SnapshotReader和BinlogReader,分別實現了對MySQL數據的全量讀取和增量讀取,他們繼承於AbstractReader,里面封裝了共用邏輯,下圖是AbstractReader的內部設計。

可以看到,AbstractReader在實現時,並沒有直接將enqueue喂進來的record投遞進Kafka,而是通過一個內存阻塞隊列BlockingQueue進行了解耦,這種設計有諸多好處:

  1. 職責解耦

如上的圖中,在喂入BlockingQueue之前,要根據條件判斷是否接受該record;在向Kafka投遞record之前,判斷task的running狀態。這樣把同類的功能限定在特定的位置。

  1. 線程隔離

BlockingQueue是一個線程安全的阻塞隊列,通過BlockingQueue實現的生產者消費者模型,是可以跑在不同的線程里的,這樣避免局部的阻塞帶來的整體的干擾。如上圖中的右側,消費者會定期判斷running標志位,若running被stop信號置為了false,可以立刻停止整個task,而不會因MySQL IO阻塞延遲相應。

  1. Single與Batch的互相轉化

Enqueue record是單條的投遞record,drain_to是批量的消費records。這個用法也可以反過來,實現batch到single的轉化。

可能你還知道阿里開源的另一個MySQL CDC工具canal,他只負責stream過程,並沒有處理snapshot過程,這也是debezium相較於canal的一個優勢。

對於Debezium來說,基本沿用了官方搭建從庫的這一思路,讓我們看下官方文檔描述的詳細步驟。

MySQL連接器每次獲取快照的時候會執行以下的步驟:

  1. 獲取一個全局讀鎖,從而阻塞住其他數據庫客戶端的寫操作。
  2. 開啟一個可重復讀語義的事務,來保證后續的在同一個事務內讀操作都是在一個一致性快照中完成的。
  3. 讀取binlog的當前位置。
  4. 讀取連接器中配置的數據庫和表的模式(schema)信息。
  5. 釋放全局讀鎖,允許其他的數據庫客戶端對數據庫進行寫操作。
  6. (可選)把DDL改變事件寫入模式改變topic(schema change topic),包括所有的必要的DROP和CREATEDDL語句。
  7. 掃描所有數據庫的表,並且為每一個表產生一個和特定表相關的kafka topic創建事件(即為每一個表創建一個kafka topic)。
  8. 提交事務。
  9. 記錄連接器成功完成快照任務時的連接器偏移量。

部署

基於 Kafka Connect

最常見的架構是通過 Apache Kafka Connect 部署 Debezium。Kafka Connect 為在 Kafka 和外部存儲系統之間系統數據提供了一種可靠且可伸縮性的方式。它為 Connector 插件提供了一組 API 和一個運行時:Connect 負責運行這些插件,它們則負責移動數據。通過 Kafka Connect 可以快速實現 Source Connector 和 Sink Connector 進行交互構造一個低延遲的數據 Pipeline:

  • Source Connector(例如,Debezium):將記錄發送到 Kafka
  • Sink Connector:將 Kafka Topic 中的記錄發送到其他系統

如上圖所示,部署了 MySQL 和 PostgresSQL 的 Debezium Connector 以捕獲這兩種類型數據庫的變更。每個 Debezium Connector 都會與其源數據庫建立連接:

  • MySQL Connector 使用客戶端庫來訪問 binlog。
  • PostgreSQL Connector 從邏輯副本流中讀取數據。

除了 Kafka Broker 之外,Kafka Connect 也作為一個單獨的服務運行。默認情況下,數據庫表的變更會寫入名稱與表名稱對應的 Kafka Topic 中。如果需要,您可以通過配置 Debezium 的 Topic 路由轉換來調整目標 Topic 名稱。例如,您可以:

  • 將記錄路由到名稱與表名不同的 Topic 中
  • 將多個表的變更事件記錄流式傳輸到一個 Topic 中

變更事件記錄在 Apache Kafka 中后,Kafka Connect 生態系統中的不同 Sink Connector 可以將記錄流式傳輸到其他系統、數據庫,例如 Elasticsearch、數據倉庫、分析系統或者緩存(例如 Infinispan)。

Debezium Server

另一種部署 Debezium 的方法是使用 Debezium Server。Debezium Server 是一個可配置的、隨時可用的應用程序,可以將變更事件從源數據庫流式傳輸到各種消息中間件上。

下圖展示了基於 Debezium Server 的變更數據捕獲 Pipeline 架構:

Debezium Server 配置使用 Debezium Source Connector 來捕獲源數據庫中的變更。變更事件可以序列化為不同的格式,例如 JSON 或 Apache Avro,然后發送到各種消息中間件,例如 Amazon Kinesis、Google Cloud Pub/Sub 或 Apache Pulsar。

嵌入式引擎

使用 Debezium Connector 的另一種方法是嵌入式引擎。在這種情況下,Debezium 不會通過 Kafka Connect 運行,而是作為嵌入到您自定義 Java 應用程序中的庫運行。這對於在您的應用程序本身內獲取變更事件非常有幫助,無需部署完整的 Kafka 和 Kafka Connect 集群,也不用將變更流式傳輸到 Amazon Kinesis 等消息中間件上。

特性

Debezium 是一組用於 Apache Kafka Connect 的 Source Connector。每個 Connector 都通過使用該數據庫的變更數據捕獲 (CDC) 功能從不同的數據庫中獲取變更。與其他方法(例如輪詢或雙重寫入)不同,Debezium 的實現基於日志的 CDC:

  • 確保捕獲所有的數據變更。
  • 以極低的延遲生成變更事件,同時避免因為頻繁輪詢導致 CPU 使用率增加。例如,對於 MySQL 或 PostgreSQL,延遲在毫秒范圍內。
  • 不需要更改您的數據模型,例如 ‘Last Updated’ 列。
  • 可以捕獲刪除操作。
  • 可以捕獲舊記錄狀態以及其他元數據,例如,事務 ID,具體取決於數據庫的功能和配置。
  • 2020 年 7 月提交了第一個 commit,這是基於個人興趣孵化的項目;
  • 2020 年 7 中旬支持了 MySQL-CDC;
  • 2020 年 7 月末支持了 Postgres-CDC;

一年的時間,該項目在 GitHub 上的 star 數已經超過 800。

Flink CDC 底層封裝了 Debezium, Debezium 同步一張表分為兩個階段:

  • 全量階段:查詢當前表中所有記錄;
  • 增量階段:從 binlog 消費變更數據。

大部分用戶使用的場景都是全量 + 增量同步,加鎖是發生在全量階段,目的是為了確定全量階段的初始位點,保證增量 + 全量實現一條不多,一條不少,從而保證數據一致性。從下圖中我們可以分析全局鎖和表鎖的一些加鎖流程,左邊紅色線條是鎖的生命周期,右邊是 MySQL 開啟可重復讀事務的生命周期。

以全局鎖為例,首先是獲取一個鎖,然后再去開啟可重復讀的事務。這里鎖住操作是讀取 binlog 的起始位置和當前表的 schema。這樣做的目的是保證 binlog 的起始位置和讀取到的當前 schema 是可以對應上的,因為表的 schema 是會改變的,比如如刪除列或者增加列。在讀取這兩個信息后,SnapshotReader 會在可重復讀事務里讀取全量數據,在全量數據讀取完成后,會啟動 BinlogReader 從讀取的 binlog 起始位置開始增量讀取,從而保證全量數據 + 增量數據的無縫銜接。

表鎖是全局鎖的退化版,因為全局鎖的權限會比較高,因此在某些場景,用戶只有表鎖。表鎖鎖的時間會更長,因為表鎖有個特征:鎖提前釋放了可重復讀的事務默認會提交,所以鎖需要等到全量數據讀完后才能釋放。

經過上面分析,接下來看看這些鎖到底會造成怎樣嚴重的后果:

Flink CDC 1.x 可以不加鎖,能夠滿足大部分場景,但犧牲了一定的數據准確性。Flink CDC 1.x 默認加全局鎖,雖然能保證數據一致性,但存在上述 hang 住數據的風險。

Flink CDC 1.x得到了很多用戶在社區的反饋,主要歸納為三個:

  • 全量 + 增量讀取的過程需要保證所有數據的一致性,因此需要通過加鎖保證,但是加鎖在數據庫層面上是一個十分高危的操作。底層 Debezium 在保證數據一致性時,需要對讀取的庫或表加鎖,全局鎖可能導致數據庫鎖住,表級鎖會鎖住表的讀,DBA 一般不給鎖權限。
  • 不支持水平擴展,因為 Flink CDC 底層是基於 Debezium,起架構是單節點,所以Flink CDC 只支持單並發。在全量階段讀取階段,如果表非常大 (億級別),讀取時間在小時甚至天級別,用戶不能通過增加資源去提升作業速度。
  • 全量讀取階段不支持 checkpoint:CDC 讀取分為兩個階段,全量讀取和增量讀取,目前全量讀取階段是不支持 checkpoint 的,因此會存在一個問題:當我們同步全量數據時,假設需要 5 個小時,當我們同步了 4 小時的時候作業失敗,這時候就需要重新開始,再讀取 5 個小時。

通過上面的分析,可以知道 2.0 的設計方案,核心要解決上述的三個問題,即支持無鎖、水平擴展、checkpoint。

目前,Flink CDC 2.0 也已經正式發布,此次的核心改進和提升包括:

  • 並發讀取,全量數據的讀取性能可以水平擴展;
  • 全程無鎖,不對線上業務產生鎖的風險;
  • 斷點續傳,支持全量階段的 checkpoint。

Canal

canal [kə'næl],譯意為水道/管道/溝渠,主要用途是基於 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費。

早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。

基於日志增量訂閱和消費的業務包括:

  • 數據庫鏡像
  • 數據庫實時備份
  • 索引構建和實時維護(拆分異構索引、倒排索引等)
  • 業務 cache 刷新
  • 帶業務邏輯的增量數據處理

當前的canal支持源端MySQL版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。

工作原理

MySQL主備復制原理

  • MySQL master 將數據變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看)
  • MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
  • MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據

canal 工作原理

  • canal 模擬 MySQL slave 的交互協議,偽裝自己為MySQL slave,向MySQL master發送dump協議
  • MySQL master收到 dump 請求,開始推送 binary log 給 slave (即 canal )
  • canal 解析 binary log 對象(原始為 byte 流)

Binlog獲取詳解

Binlog發送接收流程,流程如下圖所示:

首先,我們需要偽造一個slave,向master注冊,這樣master才會發送binlog event。注冊很簡單,就是向master發送COM_REGISTER_SLAVE命令,帶上slave相關信息。這里需要注意,因為在MySQL的replication topology中,都需要使用一個唯一的server id來區別標示不同的server實例,所以這里我們偽造的slave也需要一個唯一的server id。

接着實現binlog的dump。MySQL只支持一種binlog dump方式,也就是指定binlog filename + position,向master發送COM_BINLOG_DUMP命令。在發送dump命令的時候,我們可以指定flag為BINLOG_DUMP_NON_BLOCK,這樣master在沒有可發送的binlog event之后,就會返回一個EOF package。不過通常對於slave來說,一直把連接掛着可能更好,這樣能更及時收到新產生的binlog event。

Dump命令包圖如下所示:

如上圖所示,在報文中塞入binlogPosition和binlogFileName即可讓master從相應的位置發送binlog event。

canal結構

說明:

  • server代表一個canal運行實例,對應於一個jvm,也可以理解為一個進程
  • instance對應於一個數據隊列 (1個server對應1..n個instance),每一個數據隊列可以理解為一個數據庫實例。

Server設計

server代表了一個canal的運行實例,為了方便組件化使用,特意抽象了Embeded(嵌入式) / Netty(網絡訪問)的兩種實現

  • Embeded : 對latency和可用性都有比較高的要求,自己又能hold住分布式的相關技術(比如failover)
  • Netty : 基於netty封裝了一層網絡協議,由canal server保證其可用性,采用的pull模型,當然latency會稍微打點折扣,不過這個也視情況而定。(阿里系的notify和metaq,典型的push/pull模型,目前也逐步的在向pull模型靠攏,push在數據量大的時候會有一些問題)

Instance設計

instance代表了一個實際運行的數據隊列,包括了EventPaser,EventSink,EventStore等組件。

抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:

manager方式:和你自己的內部web console/manager系統進行對接。(目前主要是公司內部使用,Otter采用這種方式) spring方式:基於spring xml + properties進行定義,構建spring配置.

下面是canalServer和instance如何運行:

canalServer.setCanalInstanceGenerator(new CanalInstanceGenerator() {

            public CanalInstance generate(String destination) {
                Canal canal = canalConfigClient.findCanal(destination);
                // 此處省略部分代碼 大致邏輯是設置canal一些屬性

                CanalInstanceWithManager instance = new CanalInstanceWithManager(canal, filter) {

                    protected CanalHAController initHaController() {
                        HAMode haMode = parameters.getHaMode();
                        if (haMode.isMedia()) {
                            return new MediaHAController(parameters.getMediaGroup(),
                                parameters.getDbUsername(),
                                parameters.getDbPassword(),
                                parameters.getDefaultDatabaseName());
                        } else {
                            return super.initHaController();
                        }
                    }

                    protected void startEventParserInternal(CanalEventParser parser, boolean isGroup) {
                        //大致邏輯是 設置支持的類型
                        //初始化設置MysqlEventParser的主庫信息,這處抽象不好,目前只支持mysql
                    }

                };
                return instance;
            }
        });
        canalServer.start(); //啟動canalServer

        canalServer.start(destination);//啟動對應instance
        this.clientIdentity = new ClientIdentity(destination, pipeline.getParameters().getMainstemClientId(), filter);
        canalServer.subscribe(clientIdentity);// 發起一次訂閱,當監聽到instance配置時,調用generate方法注入新的instance

instance模塊:

  • eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)
  • eventSink (Parser和Store鏈接器,進行數據過濾,加工,分發的工作)
  • eventStore (數據存儲)
  • metaManager (增量訂閱&消費信息管理器)

EventParser設計

大致過程:

整個parser過程大致可分為幾步:

  • Connection獲取上一次解析成功的位置 (如果第一次啟動,則獲取初始指定的位置或者是當前數據庫的binlog位點)
  • Connection建立鏈接,發送BINLOG_DUMP指令
// 0. write command number
// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name
  • Mysql開始推送Binaly Log
  • 接收到的Binaly Log的通過Binlog parser進行協議解析,補充一些特定信息(補充字段名字,字段類型,主鍵信息,unsigned類型處理)
  • 傳遞給EventSink模塊進行數據存儲,是一個阻塞操作,直到存儲成功
  • 存儲成功后,由CanalLogPositionManager定時記錄Binaly Log位置

EventSink設計

說明:

  • 數據過濾:支持通配符的過濾模式,表名,字段內容等
  • 數據路由/分發:解決1:n (1個parser對應多個store的模式)
  • 數據歸並:解決n:1 (多個parser對應1個store)
  • 數據加工:在進入store之前進行額外的處理,比如join

數據1:n業務

為了合理的利用數據庫資源, 一般常見的業務都是按照schema進行隔離,然后在mysql上層或者dao這一層面上,進行一個數據源路由,屏蔽數據庫物理位置對開發的影響,阿里系主要是通過cobar/tddl來解決數據源路由問題。

所以,一般一個數據庫實例上,會部署多個schema,每個schema會有由1個或者多個業務方關注。

數據n:1業務

同樣,當一個業務的數據規模達到一定的量級后,必然會涉及到水平拆分和垂直拆分的問題,針對這些拆分的數據需要處理時,就需要鏈接多個store進行處理,消費的位點就會變成多份,而且數據消費的進度無法得到盡可能有序的保證。

所以,在一定業務場景下,需要將拆分后的增量數據進行歸並處理,比如按照時間戳/全局id進行排序歸並。

EventStore設計

  1. 目前僅實現了Memory內存模式,后續計划增加本地file存儲,mixed混合模式。
  2. 借鑒了Disruptor的RingBuffer的實現思路

RingBuffer設計:

定義了3個cursor

Put : Sink模塊進行數據存儲的最后一次寫入位置 Get : 數據訂閱獲取的最后一次提取位置 Ack : 數據消費成功的最后一次消費位置

借鑒Disruptor的RingBuffer的實現,將RingBuffer拉直來看:

實現說明:

Put/Get/Ack cursor用於遞增,采用long型存儲buffer的get操作,通過取余或者與操作。(與操作:cusor & (size - 1) , size需要為2的指數,效率比較高)

HA機制設計

canal的ha分為兩部分,canal server和canal client分別有對應的ha實現

  • canal server: 為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處於running,其他的處於standby狀態.
  • canal client: 為了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操作,否則客戶端接收無法保證有序。

整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命周期綁定),可以看下我之前zookeeper的相關文章。

Canal Server:

大致步驟:

  • canal server要啟動某個canal instance時都先向zookeeper進行一次嘗試啟動判斷 (實現:創建EPHEMERAL節點,誰創建成功就允許誰啟動)
  • 創建zookeeper節點成功后,對應的canal server就啟動對應的canal instance,沒有創建成功的canal instance就會處於standby狀態
  • 一旦zookeeper發現canal server A創建的節點消失后,立即通知其他的canal server再次進行步驟1的操作,重新選出一個canal server啟動instance
  • canal client每次進行connect時,會首先向zookeeper詢問當前是誰啟動了canal instance,然后和其建立鏈接,一旦鏈接不可用,會重新嘗試connect

Canal Client的方式和canal server方式類似,也是利用zookeeper的搶占EPHEMERAL節點的方式進行控制。

總結

CDC 的技術方案非常多,目前業界主流的實現機制可以分為兩種:

基於查詢的 CDC:

  • 離線調度查詢作業,批處理。把一張表同步到其他系統,每次通過查詢去獲取表中最新的數據;
  • 無法保障數據一致性,查的過程中有可能數據已經發生了多次變更;
  • 不保障實時性,基於離線調度存在天然的延遲。

基於日志的 CDC:

  • 實時消費日志,流處理,例如 MySQL 的 binlog 日志完整記錄了數據庫中的變更,可以把 binlog 文件當作流的數據源;
  • 保障數據一致性,因為 binlog 文件包含了所有歷史變更明細;
  • 保障實時性,因為類似 binlog 的日志文件是可以流式消費的,提供的是實時數據。

對比常見的開源 CDC 方案,我們可以發現:

  • 對比增量同步能力:
  • 基於日志的方式,可以很好的做到增量同步;
  • 而基於查詢的方式是很難做到增量同步的。
  • 對比全量同步能力,基於查詢或者日志的 CDC 方案基本都支持,除了 Canal。
  • 而對比全量 + 增量同步的能力,只有 Flink CDC、Debezium、Oracle Goldengate 支持較好。
  • 從架構角度去看,該表將架構分為單機和分布式,這里的分布式架構不單純體現在數據讀取能力的水平擴展上,更重要的是在大數據場景下分布式系統接入能力。例如 Flink CDC 的數據入湖或者入倉的時候,下游通常是分布式的系統,如 Hive、HDFS、Iceberg、Hudi 等,那么從對接入分布式系統能力上看,Flink CDC 的架構能夠很好地接入此類系統。
  • 在數據轉換 / 數據清洗能力上,當數據進入到 CDC 工具的時候是否能較方便的對數據做一些過濾或者清洗,甚至聚合?
  • 在 Flink CDC 上操作相當簡單,可以通過 Flink SQL 去操作這些數據;
  • 但是像 DataX、Debezium 等則需要通過腳本或者模板去做,所以用戶的使用門檻會比較高。

另外,在生態方面,這里指的是下游的一些數據庫或者數據源的支持。Flink CDC 下游有豐富的 Connector,例如寫入到 TiDB、MySQL、Pg、HBase、Kafka、ClickHouse 等常見的一些系統,也支持各種自定義 connector。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM