數據實時增量同步工具CDC之Maxwell和Canal


1.什么是CDC?

CDC是Change Data Capture(變更數據獲取)的簡稱。可以基於增量日志,以極低的侵入性來完成增量數據捕獲的工作。核心思想是,監測並捕獲數據庫的變動(包括數據或數據表的插入、更新以及刪除等),
將這些變更按發生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。

1.1什么是變更數據捕獲?

 CDC是指從源數據庫捕獲到數據和數據結構(也稱為模式)的增量變更,近乎實時地將這些變更,傳播到其他數據庫或應用程序之處。
通過這種方式,CDC能夠向數據倉庫提供高效、低延遲的數據傳輸,以便信息被及時轉換並交付給專供分析的應用程序。

與批量復制相比,變更數據的捕獲通常具有如下三項基本優勢:

CDC通過僅發送增量的變更,來降低通過網絡傳輸數據的成本。
CDC可以幫助用戶根據最新的數據做出更快、更准確的決策。例如,CDC會將事務直接傳輸到專供分析的應用上。
CDC最大限度地減少了對於生產環境網絡流量的干擾。

常見的CDC工具有:    
    maxwell :基於MYSQL的binlog 
    canal   :基於MYSQL的binlog 
    debezium
    flinkcdc 

注:sqoop不是CDC工具 sqoop是基於查詢的全量數據捕獲.

2.Maxwell

Maxwell 是由美國Zendesk開源,用Java編寫的MySQL實時抓取軟件。 
實時讀取MySQL二進制日志Binlog,並生成 JSON 格式的消息,
作為生產者發送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的應用程序。

2.1 Maxwell工作原理

2.1.1  MySQL主從復制過程
  <1>Master主庫將改變記錄,寫到二進制日志(binary log)中 
  <2>Slave從庫向mysql master發送dump協議,將master主庫的binary log events拷貝到它的中繼日志(relay log);
  <3>Slave從庫讀取並重做中繼日志中的事件,將改變的數據同步到自己的數據庫。

2.1.2  MySQL的binlog

 <1> binlog
  MySQL的二進制日志可以說MySQL最重要的日志了,它記錄了所有的DDL和DML(除了數據查詢語句)語句,
  以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進制日志是事務安全型的。
  
  一般來說開啟二進制日志大概會有1%的性能損耗。二進制有兩個最重要的使用場景: 
    其一:MySQL Replication在Master端開啟binlog,Master把它的二進制日志傳遞給slaves來達到master-slave數據一致的目的。 
    其二:自然就是數據恢復了,通過使用mysqlbinlog工具來使恢復數據。
    二進制日志包括兩類文件:二進制日志索引文件(文件名后綴為.index)用於記錄所有的二進制文件,二進制日志文件(文件名后綴為.00000*)記錄數據庫所有的DDL和DML(除了數據查詢語句)語句事件。
    
    binlog文件的滾動:1)達到了滾動的大小 2)mysql服務停止

<2>  mysql binlog的格式有三種,分別是STATEMENT,MIXED,ROW
 在配置文件中可以選擇配置    binlog_format= statement|mixed|row
 
 在配置文件中可以選擇配置    binlog_format= statement|mixed|row
    三種格式的區別: 
    statement 
        語句級,binlog會記錄每次一執行寫操作的語句。
        相對row模式節省空間,但是可能產生不一致性,比如
        update  tt set create_date=now() 
        如果用binlog日志進行恢復,由於執行時間不同可能產生的數據就不同。
        優點: 節省空間
        缺點: 有可能造成數據不一致。
    row 
        行級, binlog會記錄每次操作后每行記錄的變化。
        優點:保持數據的絕對一致性。因為不管sql是什么,引用了什么函數,他只記錄執行后的效果。
        缺點:占用較大空間。
    mixed
        statement的升級版,一定程度上解決了,因為一些情況而造成的statement模式不一致問題
        默認還是statement,在某些情況下譬如:
            當函數中包含 UUID() 時;
            包含 AUTO_INCREMENT 字段的表被更新時;
            執行 INSERT DELAYED 語句時;
            用 UDF 時;
        會按照 ROW的方式進行處理
        優點:節省空間,同時兼顧了一定的一致性。
        缺點:還有些極個別情況依舊會造成不一致,另外statement和mixed對於需要對binlog的監控的情況都不方便。
綜合上面對比,Maxwell想做監控分析,選擇row格式比較合適。

2.1.3 修改mysql配置

sudo vim /etc/my.cnf
#添加進去
server-id= 1
#生成bitlog文件的前綴
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall_rt
#想監控多個庫就多加幾個binlog-do-db


#重啟mysql服務 讓配置生效
systemctl restart mysqld

 2.1.4  Maxwell的工作原理

很簡單,就是把自己偽裝成slave,假裝從master復制數據

3.Maxwell安裝配置

3.1 mysql中創建Maxwell賬號和元數據庫

--創建maxwell元數據庫
CREATE DATABASE maxwell ;
--創建maxwell賬號和權限
GRANT ALL   ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT  SELECT ,REPLICATION SLAVE , REPLICATION CLIENT  ON *.* TO maxwell@'%';

3.2 修改maxwell配置文件

--復制配置文件 在maxwell根目錄下
cp config.properties.example config.properties
--修改配置文件
vim  config.properties

producer=kafka
#用,分隔
kafka.bootstrap.servers=Ava01:9092,Ava02:9092,Ava03:9092 
#需要添加
kafka_topic=topic名字 發送到kafka的哪個topic #ods_base_db_m
# mysql login info
host=Ava01
user=maxwell
password=123456
#需要添加 后續初始化會用
client_id=maxwell_1

3.3 maxwell發送數據到消息隊列的分區方式

--發送數據的分區方式是可選的
--可以對數據進行分區,解決數據傾斜問題,
--默認還是輸出到指定Kafka主題的一個kafka分區,因為多個分區並行可能會打亂binlog的順序
--如果要提高並行度,首先設置kafka的分區數>1,然后設置producer_partition_by屬性
--database是指將database名字當做key 同一個數據庫的所有表都會分配到同一個分區
--table table名字作為key 同一張表的數據發送到同一個分區
--primary_key 表中的主鍵為key 數據分配是最均勻的 但是可能會亂序
--transaction_id 事務id
--column 列名
--一般情況下 業務數據要保持消費順序的話 采用 database+table 為key 將同一個庫下的同一張表的數據發送到一個分區 因為kafka只能保證統一分區消費有序
#producer_partition_by=database # [database, table, primary_key, transaction_id, column]

3.4 啟動 需要指定配置文件

./maxwell  --config /opt/module/maxwell-1.25.0/config.propertie

3.5 maxwell發送數據 測試 

#啟動消費者
kafka-console-consumer.sh --bootstrap-server Ava01:9092 --topic maxwell_test

在mysql配置文件里指定哪個庫生成 binlog 就在那個庫里執行一個update或者insert測試一下 觀察消費者消費到的數據

 4. Maxwell發送的數據

4.1 數據條數:

一條sql影響了N行, maxwell會發送N條消息 也就是會有 N個json  比如一個update語句影響了2條 maxwell會產生兩條json

4.2 數據格式

DML
{
    "database": "gmall_rt",
    "table": "cart_info",
    "type": "insert",
    "ts": 1631706456,
    "xid": 7886,
    "xoffset": 13987,
    "data": {
        "id": 148278,
        "user_id": "2059",
        "sku_id": 13,
        "cart_price": 4188,
        "sku_num": 1,
        "img_url": "http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg",
        "sku_name": "華為 HUAWEI P40 麒麟990 5G SoC芯片 5000萬超感知徠卡三攝 30倍數字變焦 6GB+128GB亮黑色全網通5G手機",
        "is_checked": null,
        "create_time": "2021-09-15 19:47:33",
        "operate_time": null,
        "is_ordered": 0,
        "order_time": null,
        "source_type": "2401",
        "source_id": null
    }
}
DDL
{
    "type": "table-create", 
    "database": "gmall_rt", 
    "table": "user_info", 
    "def": {
        "database": "gmall_rt", 
        "charset": "utf8", 
        "table": "user_info", 
        "columns": [
            {
                "type": "int", 
                "name": "id", 
                "signed": true
            }, 
            {
                "type": "varchar", 
                "name": "name", 
                "charset": "utf8"
            }
        ], 
        "primary-key": [ ]
    }, 
    "ts": 1631343066000, 
    "sql": "create table user_info(
	id int,
	`name` varchar(25)
)"
}

4.3 maxwell的元數據

maxwell的元數據庫的position表,記錄讀取的binlog文件和位置
支持斷點還原

4.5 HA

maxwell一般不支持高可用,但是 較新版本,已經添加了 ha的功能,但是還在實驗中

4.6 maxwell初始化功能

 maxwell支持歷史數據同步功能,也就是將沒有生成binlog之前的數據也讀取出來
 支持 SELECT * FROM table 的方式進行全量數據初始化
 因為本身記錄了position所以最大程度的保持了數據一致

 --執行命令 
bin/maxwell-bootstrap --user maxwell  --password 123456 --host Ava01  --database gmall_rt --table user_info --client_id maxwell_1



#參數解釋
--user maxwell  
數據庫分配的操作 maxwell數據庫的用戶名
--password 123456
數據庫分配的操作maxwell數據庫的密碼
--host
數據庫主機名
--database
數據庫名
--table
表名
--client_id
maxwell-bootstrap不具備將數據直接導入kafka或者hbase的能力,
通過--client_id指定將數據交給哪個maxwell進程處理,在maxwell的conf.properties中配置

 

5.maxwell的啟動腳本 指定配置

vim /home/otto/bin/maxwell.sh
/opt/module/maxwell-1.25.0/bin/maxwell --config  /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &

 

 

====================================================Canal=========================================================================

先說Canal和Maxwell的區別:

1、maxwell支持斷點還原、未來支持HA
      canal支持HA(高可用),不支持斷點還原
2、數據格式
   maxwell輕量級,只有一些核心關鍵的字段
   canal啥都有,old、sql、.........
   data字段:如果一條sql影響了N行數據
   maxwell有N個json,每個都是json串 方便處理
   canal只有一個json,所以是一個數組 處理時稍顯麻煩
3、maxwell支持歷史數據同步
     canal不支持歷史數據同步,如果需要實現的話 可以新建一張臨時表,將需要同步的數據插入臨時表,使用canal監聽臨時表(數據重放、回溯)
4. 數字類型
當原始數據是數字類型時,maxwell會尊重原始數據的類型不增加雙引,變為字符串。canal一律轉換為字符串。
5.帶原始數據字段定義
canal數據中會帶入表結構。maxwell更簡潔。

1.Canal

canal是用java開發的基於數據庫增量日志解析,提供增量數據訂閱&消費的中間件。
目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client來處理獲得的相關數據。(數據庫同步需要阿里的otter中間件,基於canal)。

1.1 Canal的使用場景: 

 (1)    原始場景: 阿里otter中間件的一部分
otter是阿里用於進行異地數據庫之間的同步框架,canal是其中一部分。
 (2)     常見場景1:更新緩存
 (3)    常見場景2:抓取業務數據新增變化表,用於制作拉鏈表。
 (4)    常見場景3:抓取業務表的新增變化數據,用於制作實時統計(我們就是這種場景)

1.2 Canal的工作原理

和Maxwell類似 都是假裝skave讀取binlog

1.3 Canal的架構

1.4 安裝配置

#給Canal也創建一個Mysql用戶
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY '密碼' ;
#注:Canal的壓縮包解壓之后是散的 所以需要提前創建目錄

#解壓之后
cd /opt/module/canal/conf
vim canal.properties

注: canal的端口號默認是11111
# 修改canal的輸出model,默認tcp,改為輸出到kafka
# tcp, kafka, RocketMQ
canal.serverMode = kafka

#消息隊列的配置項
##################################################
#########                    MQ                      #############
##################################################
canal.mq.servers = Ava01:9092,Ava02:9092,Ava03:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

#創建多個實例:
#一個canal服務中可以有多個instance,conf/下的每一個example即是一個實例,
#每個實例下面都有獨立的配置文件。默認只有一個實例example,如果需要多個實例處理不同的MySQL數據的話,直接拷貝出多個example,
#並對其重新命名,命名和配置文件中指定的名稱一致,然后修改canal.properties中的canal.destinations=實例1,實例2,實例3。
canal.destinations = example

#只讀取一個MySQL數據,所以只有一個實例,這個實例的配置文件在conf/example目錄下

cd /opt/module/canal/conf/example
vim instance.properties

#配置數據庫地址和用戶名密碼
canal.instance.master.address=Ava01:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456

# mq config
# 修改輸出到Kafka的主題以及分區數
canal.mq.topic=canal_test
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*

注意:默認還是輸出到指定Kafka主題的一個kafka分區,因為多個分區並行可能會打亂binlog的順序
如果要提高並行度,首先設置kafka的分區數>1,然后設置canal.mq.partitionHash屬性

2.Canal測試 (和上面maxwell測試一樣)

3.CanalHA模式 

這種zookeeper為觀察者監控的模式,只能實現高可用,而不是負載均衡.
即同一時點只有一個canal-server節點能夠監控某個數據源,只要這個節點能夠正常工作,
那么其他監控這個數據源的canal-server只能做stand-by,直到工作節點停掉,其他canal-server節點才能搶占。
因為有一個stand-by也要占用資源,同時canal傳輸數據宕機的情況也比較少,所以好多企業是不配置canal的高可用的。

4.Canal數據格式

#ddl
{
    "data": null,
    "database": "gmall_rt",
    "es": 1631347983000,
    "id": 2,
    "isDdl": true,
    "mysqlType": null,
    "old": null,
    "pkNames": null,
    "sql": "create table test_log (id VARCHAR(5) ,name VARCHAR(20))",
    "sqlType": null,
    "table": "test_log",
    "ts": 1631347983479,
    "type": "CREATE"
}


#影響多條是數組 處理確實不如maxwell方便
{
    "data": [
        {
            "id": "1",
            "name": "zs"
        },
        {
            "id": "2",
            "name": "xf"
        }
    ],
    "database": "gmall_rt",
    "es": 1631348033000,
    "id": 3,
    "isDdl": false,
    "mysqlType": {
        "id": "VARCHAR(5)",
        "name": "VARCHAR(20)"
    },
    "old": null,
    "pkNames": null,
    "sql": "",
    "sqlType": {
        "id": 12,
        "name": 12
    },
    "table": "test_log",
    "ts": 1631348033643,
    "type": "INSERT"
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM