華為雲MRS CDL架構設計與實現


1 前言

MRS CDL是華為雲FusionInsight MRS推出的一種數據實時同步服務,旨在將傳統OLTP數據庫中的事件信息捕捉並實時推送到大數據產品中去,本文檔會詳細為大家介紹CDL的整體架構以及關鍵技術。

2 CDL的概念

MRS CDL(Change Data Loader)是一款基於Kafka Connect的CDC數據同步服務,可以從多種OLTP數據源捕獲數據,如Oracle、MySQL、PostgreSQL等,然后傳輸給目標存儲,該目標存儲可以大數據存儲如HDFS,OBS,也可以是實時數據湖Hudi等。

2.1 什么是CDC?

CDC(Change Data Capture)是一種通過監測數據變更(新增、修改、刪除等)而對變更的數據進行進一步處理的一種設計模式,通常應用在數據倉庫以及和數據庫密切相關的一些應用上,比如數據同步、備份、審計、ETL等。

CDC技術的誕生已經有些年頭了,二十多年前,CDC技術就已經用來捕獲應用數據的變更。CDC技術能夠及時有效的將消息同步到對應的數倉中,並且幾乎對當前的生產應用不產生影響。如今,大數據應用越來越普遍,CDC這項古老的技術重新煥發了生機,對接大數據場景已經是CDC技術的新使命。

當前業界已經有許多成熟的CDC to大數據的產品,如:Oracle GoldenGate(for Kafka)、 Ali/Canal、Linkedin/Databus、Debezium/Debezium等等。

2.2 CDL支持的場景

MRS CDL吸收了以上成熟產品的成功經驗,采用Oracle LogMinner和開源的Debezium來進行CDC事件的捕捉,借助Kafka和Kafka Connect的高並發,高吞吐量,高可靠框架進行任務的部署。

現有的CDC產品在對接大數據場景時,基本都會選擇將數據同步到消息隊列Kafka中。MRS CDL在此基礎上進一步提供了數據直接入湖的能力,可以直接對接MRS HDFS和Huawei OBS以及MRS Hudi、ClickHouse等,解決數據的最后一公里問題。

場景 數據源 目標存儲
實時數據湖分析 Oracle Huawei OBS, MRS HDFS, MRS Hudi, MRS ClickHouse, MRS Hive
實時數據湖分析 MySQL Huawei OBS, MRS HDFS, MRS Hudi, MRS ClickHouse, MRS Hive
實時數據湖分析 PostgreSQL Huawei OBS, MRS HDFS, MRS Hudi, MRS ClickHouse, MRS Hive

表1 MRS CDL支持的場景

3 CDL的架構

作為一個CDC系統,能夠從源目標抽取數據並且傳輸到目標存儲中去是基本能力,在此基礎上,靈活、高性能、高可靠、可擴展、可重入、安全是MRS CDL着重考慮的方向,因此,CDL的核心設計原則如下:

  • 系統結構必須滿足可擴展性原則,支持在不損害現有系統功能的前提下添加新的源和目標數據存儲。
  • 架構設計應當滿足不同角色間的業務側重點分離
  • 在合理的情況下減少復雜性和依賴性,最大限度的降低架構、安全性、韌性方面的風險。
  • 需要滿足插件式的客戶需求,提供通用的插件能力,使得系統靈活、易用、可配置。
  • 業務安全,避免橫向越權和信息泄露。

3.1 架構圖/角色介紹


圖1 CDL架構

MRS CDL包含CDL Service和CDL Connector兩個角色,他們各自的職能如下:

  • CDL Service:負責任務的管理和調度,提供統一的API接口,同時監測整個CDL服務的健康狀態。
  • CDL Connector:本質上是Kafka Connect的Worker進程,負責真實Task的運行,在Kafka Connect高可靠、高可用、可擴展的特性基礎上增加了心跳機制來協助CDL Service完成集群的健康監測。

3.2 為什么選擇Kafka?

我們將Apache Kafka與Flume和Nifi等各種其他選項進行了比較,如下表所示:

|Flume|Nifi|Kafka
:--😐:--😐:--😐:--:
優點|基於配置的Agent架構;攔截器;Source、Channel、Sink模型| 有許多開箱即用的處理器;背壓機制;處理任意大小的消息;支持MiNifi Agent來收集數據;支持邊緣層數據流|可擴展、分布式、高容錯、高吞吐量的消息傳遞系統;背壓機制;無數據丟失;Kafka Connect支持Source、Sink模型;超過50種可用的Connector;消息保序;低耦合
缺點|存在數據丟失的場景;沒有數據備份;數據大小限制;沒有背壓機制|沒有數據復制;脆弱的容錯機制;不支持消息保序;可擴展性較差|消息大小限制

表1 框架比較

對於CDC系統,Kafka有足夠的優勢來支撐我們做出選擇。同時,Kafka Connect的架構完美契合CDC系統:

  • 並行 - 對於一個數據復制任務,可以通過拆解成多個子任務並且並行運行來提高吞吐率。
  • 保序 - Kafka的partition機制可以保證在一個partition內數據嚴格有序,這樣有助於我們實現數據完整性。
  • 可擴展 - Kafka Connect在集群中分布式的運行Connector。
  • 易用 - 對Kafka的接口進行了抽象,提升了易用性。
  • 均衡 - Kafka Connect自動檢測故障,並在剩余進程上根據各自負載重新進行均衡調度。
  • 生命周期管理 – 提供完善的Connector的生命周期管理能力。

4 MRS CDL關鍵技術


圖2 CDL關鍵技術

4.1 CDL Job

MRS CDL對業務進行了上層的抽象,通過引入CDL Job的概念來定義一個完整的業務流程。在一個Job中,用戶可以選擇數據源和目標存儲類型,並且可以篩選要復制的數據表。

在Job結構的基礎上,MRS CDL提供執行CDL Job的機制,在運行時,使用Kafka Connect Source Connector結合日志復制技術將CDC事件從源數據存儲捕獲到Kafka,然后使用Kafka Connect Sink Connector從Kafka提取數據,在應用各種轉換規則后將最終結果推送到目標存儲。

提供定義表級和列級映射轉換的機制,在定義CDL Job的過程中可以指定轉換規則。

4.2 Data Comparison

MRS CDL提供一種特殊的Job,用於進行數據一致性對比。用戶可以選擇源和目標數據存儲架構,從源和目標架構中選擇各種比較對進行數據比較,以確保數據在源和目標數據存儲中一致。


圖3 Data Comparison抽象視圖

MRS CDL提供了專用的Rest API來運行Data Compare Job,並且提供如下能力:

  • 提供多樣的數據比較算法,如行哈希算法,非主鍵列比較等。
  • 提供專門的查詢接口,可以查詢同步報表,展示當前Compare任務的執行明細。
  • 提供實時的基於源和目標存儲的修復腳本,一鍵修復不同步數據。

如下是Data Compare Job執行流程:


圖4 Data Compare Job執行和查看流程

4.3 Source Connectors

MRS CDL通過Kafka Connect SDK創建各種源連接器,這些連接器從各種數據源捕獲CDC事件並推送到Kafka。CDL提供專門的Rest API來管理這些數據源連接器的生命周期。

4.3.1 Oracle Source Connector

Oracle Source Connector使用Oracle RDBMS提供的Log Miner接口從Oracle數據庫捕獲DDL和DML事件。


圖5 Log Miner抓取數據示意圖

在處理DML事件時,如果表中存在BOLB/CLOB列,CDL同樣可以提供支持。對於BOLB列的處理,關鍵點處理如下:

  • 當insert/update操作發生時,會觸發一系列的LOB_WRITE操作。
  • LOB_WRITE用於將文件加載到BLOB字段中。
  • 每個LOB_WRITE只能寫入1KB數據。
  • 對於一個1GB的圖片文件,我們會整理全部的100萬個LOB_WRITE操作中的二進制數據,然后合並成一個對象。我們會把這個對象存儲到Huawei OBS中,最終在寫入Kafka的message中給出該對象在OBS中的位置。

對於DDL事件的捕獲,我們創建單獨的會話來持續跟蹤。當前支持的DDL語句如下:

No DDL語句 示例
1 CREATE TABLE CREATE TABLE TEST ( EMPID INT PRIMARY KEY, ENAME VARCHAR2(10))
2 ALTER TABLE ... ADD ( ) ALTER TABLE TEST ADD ( SALARY NUMBER)
3 ALTER TABLE ... DROP COLUMN ... ALTER TABLE TEST DROP (SALARY)
4 ALTER TABLE ... MODIFY ( ... ALTER TABLE TEST MODIFY SALARY INT
5 ALTER ... RENAME... ALTER TABLE TEST RENAME TO CUSTOMER
6 DROP ... DROP TABLE TEST
7 CREATE UNIQUE INDEX ... CREATE UNIQUE INDEX TESTINDEX ON TEST (EMPID, ENAME)
8 DELETE INDEX … Delete existing index

表2 支持的DDL語句

4.3.2 MYSQL Source Connector

MYSQL的Binary Log(Bin Log)文件順序記錄了所有提交到數據庫的操作,包括了對表結構的變更和對表數據的變更。MYSQL Source Connector通過讀取Bin Log文件,生產CDC事件並提交到Kafka的Topic中。

MYSQL Source Connector主要支持的功能場景有:

  • 捕獲DML事件,並且支持並行處理所捕獲的DML事件,提升整體性能
  • 支持表過濾
  • 支持配置表和Topic的映射關系
  • 為了保證CDC事件的絕對順序,我們一般要求一張表只對應一個Partition,但是,MYSQL Source Connector仍然提供了寫入多Partition的能力,來滿足某些需要犧牲消息保序性來提升性能的場景
  • 提供基於指定Bin Log文件、指定位置或GTID來重啟任務的能力,保證異常場景下數據不丟失
  • 支持多種復雜數據類型
  • 支持捕獲DDL事件

4.3.3 PostgreSQL Source Connector

PostgreSQL的邏輯解碼特性允許我們解析提交到事務日志的變更事件,這需要通過輸出插件來處理這些變更。PostgreSQL Source Connector使用pgoutput插件來完成這項工作。pgoutput插件是PostgreSQL 10+提供的標准邏輯解碼插件,無需安裝額外的依賴包。

PostgreSQL Source Connector和MYSQL Source Connector除了部分數據類型的區別外其他功能基本一致。

4.4 Sink Connectors

MRS提供多種Sink Connector,可以從Kafka中拉取數據並推送到不同的目標存儲中。現在支持的Sink Connector有:

  • HDFS Sink Connector
  • OBS Sink Connector
  • Hudi Sink Connector
  • ClickHouse Sink Connector
  • Hive Sink Connector
    其中Hudi Sink Connector和ClickHouse Sink Connector也支持通過Flink/Spark應用來調度運行。

4.5 表過濾

當我們想在一個CDL Job中同時捕獲多張表的變更時,我們可以使用通配符(正則表達式)來代替表名,即允許同時捕獲名稱滿足規則的表的CDC事件。當通配符(正則表達式)不能嚴格匹配目標時,就會出現多余的表被捕獲。為此,CDL提供表過濾功能,來輔助通配符模糊匹配的場景。當前CDL同時支持白名單和黑名單兩種過濾方式。

4.6 統一數據格式

MRS CDL對於不同的數據源類型如Oracle、MYSQL、PostgreSQL采用了統一的消息格式存儲在Kafka中,后端消費者只需解析一種數據格式來進行后續的數據處理和傳輸,避免了數據格式多樣導致后端開發成本增加的問題。

4.7 任務級的日志瀏覽

通常境況下,一個CDL Connector會運行多個Task線程來進行CDC事件的抓取,當其中一個Task失敗時,很難從海量的日志中抽取出強相關的日志信息,來進行進一步的分析。

為了解決如上問題,CDL規范了CDL Connector的日志打印,並且提供了專用的REST API,用戶可以通過該API一鍵獲取指定Connector或者Task的日志文件。甚至可以指定起止時間來進一步縮小日志查詢的范圍。

4.8 監控

MRS CDL提供REST API來查詢CDL服務所有核心部件的Metric信息,包括服務級、角色級、實例級以及任務級。

4.9 應用程序錯誤處理

在業務運行過程中,常常會出現某些消息無法發送到目標數據源的情況,我們把這種消息叫做錯誤記錄。在CDL中,出現錯誤記錄的場景有很多種,比如:

  • Topic中的消息體與特定的序列化方式不匹配,導致無法正常讀取
  • 目標存儲中並不存在消息中所存儲的表名稱,導致消息無法發送到目標端

為了處理這種問題,CDL定義了一種“dead letter queue”,專門用於存儲運行過程中出現的錯誤記錄。本質上“dead letter queue”是由Sink Connector創建的特定的Topic,當出現錯誤記錄時,由Sink Connector將其發往“dead letter queue”進行存儲。

同時,CDL提供了REST API來供用戶隨時查詢這些錯誤記錄進行進一步分析,並且提供Rest API可以允許用戶對這些錯誤記錄進行編輯和重發。


圖6 CDL Application Error Handling

5 性能

CDL使用了多種性能優化方案來提高吞吐量:

  • Task並發
    我們利用Kafka Connect提供的任務並行化功能,其中Connect可以將作業拆分為多個任務來並行復制數據,如下所示:


圖7 Task並發

  • 使用Executor線程並行化執行任務
    由於Log Miner,Bin Log等數據復制技術的限制,我們的Source Connector只能順序的捕獲CDC事件,因此,為了提高性能,我們將這些CDC事件先緩存到內存隊列中,然后使用Executor線程並行的處理它們。這些線程會先從內部隊列中讀取數據,然后處理並且推送到Kafka中。


圖8 Executor線程並發

6 總結

MRS CDL是數據實時入湖場景下重要的一塊拼圖,我們仍然需要在數據一致性、易用性、多組件對接以及性能提升等場景需要進一步擴展和完善,在未來能夠更好的為客戶創造價值。

本文由華為雲發布。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM