1.什么是CDC?
CDC是Change Data Capture(變更數據獲取)的簡稱。可以基於增量日志,以極低的侵入性來完成增量數據捕獲的工作。核心思想是,監測並捕獲數據庫的變動(包括數據或數據表的插入、更新以及刪除等),
將這些變更按發生的順序完整記錄下來,寫入到消息中間件中以供其他服務進行訂閱及消費。
1.1什么是變更數據捕獲?
CDC是指從源數據庫捕獲到數據和數據結構(也稱為模式)的增量變更,近乎實時地將這些變更,傳播到其他數據庫或應用程序之處。
通過這種方式,CDC能夠向數據倉庫提供高效、低延遲的數據傳輸,以便信息被及時轉換並交付給專供分析的應用程序。
與批量復制相比,變更數據的捕獲通常具有如下三項基本優勢:
CDC通過僅發送增量的變更,來降低通過網絡傳輸數據的成本。
CDC可以幫助用戶根據最新的數據做出更快、更准確的決策。例如,CDC會將事務直接傳輸到專供分析的應用上。
CDC最大限度地減少了對於生產環境網絡流量的干擾。
常見的CDC工具有:
maxwell :基於MYSQL的binlog
canal :基於MYSQL的binlog
debezium
flinkcdc
注:sqoop不是CDC工具 sqoop是基於查詢的全量數據捕獲.
2.Maxwell
Maxwell 是由美國Zendesk開源,用Java編寫的MySQL實時抓取軟件。
實時讀取MySQL二進制日志Binlog,並生成 JSON 格式的消息,
作為生產者發送給 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的應用程序。
2.1 Maxwell工作原理
2.1.1 MySQL主從復制過程
<1>Master主庫將改變記錄,寫到二進制日志(binary log)中
<2>Slave從庫向mysql master發送dump協議,將master主庫的binary log events拷貝到它的中繼日志(relay log);
<3>Slave從庫讀取並重做中繼日志中的事件,將改變的數據同步到自己的數據庫。
2.1.2 MySQL的binlog
<1> binlog
MySQL的二進制日志可以說MySQL最重要的日志了,它記錄了所有的DDL和DML(除了數據查詢語句)語句,
以事件形式記錄,還包含語句所執行的消耗的時間,MySQL的二進制日志是事務安全型的。
一般來說開啟二進制日志大概會有1%的性能損耗。二進制有兩個最重要的使用場景:
其一:MySQL Replication在Master端開啟binlog,Master把它的二進制日志傳遞給slaves來達到master-slave數據一致的目的。
其二:自然就是數據恢復了,通過使用mysqlbinlog工具來使恢復數據。
二進制日志包括兩類文件:二進制日志索引文件(文件名后綴為.index)用於記錄所有的二進制文件,二進制日志文件(文件名后綴為.00000*)記錄數據庫所有的DDL和DML(除了數據查詢語句)語句事件。
binlog文件的滾動:1)達到了滾動的大小 2)mysql服務停止
<2> mysql binlog的格式有三種,分別是STATEMENT,MIXED,ROW。
在配置文件中可以選擇配置 binlog_format= statement|mixed|row
在配置文件中可以選擇配置 binlog_format= statement|mixed|row
三種格式的區別:
statement
語句級,binlog會記錄每次一執行寫操作的語句。
相對row模式節省空間,但是可能產生不一致性,比如
update tt set create_date=now()
如果用binlog日志進行恢復,由於執行時間不同可能產生的數據就不同。
優點: 節省空間
缺點: 有可能造成數據不一致。
row
行級, binlog會記錄每次操作后每行記錄的變化。
優點:保持數據的絕對一致性。因為不管sql是什么,引用了什么函數,他只記錄執行后的效果。
缺點:占用較大空間。
mixed
statement的升級版,一定程度上解決了,因為一些情況而造成的statement模式不一致問題
默認還是statement,在某些情況下譬如:
當函數中包含 UUID() 時;
包含 AUTO_INCREMENT 字段的表被更新時;
執行 INSERT DELAYED 語句時;
用 UDF 時;
會按照 ROW的方式進行處理
優點:節省空間,同時兼顧了一定的一致性。
缺點:還有些極個別情況依舊會造成不一致,另外statement和mixed對於需要對binlog的監控的情況都不方便。
綜合上面對比,Maxwell想做監控分析,選擇row格式比較合適。
2.1.3 修改mysql配置
sudo vim /etc/my.cnf
#添加進去
server-id= 1
#生成bitlog文件的前綴
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall_rt
#想監控多個庫就多加幾個binlog-do-db
#重啟mysql服務 讓配置生效
systemctl restart mysqld
2.1.4 Maxwell的工作原理
很簡單,就是把自己偽裝成slave,假裝從master復制數據
3.Maxwell安裝配置
3.1 mysql中創建Maxwell賬號和元數據庫
--創建maxwell元數據庫
CREATE DATABASE maxwell ;
--創建maxwell賬號和權限
GRANT ALL ON maxwell.* TO 'maxwell'@'%' IDENTIFIED BY '123456';
GRANT SELECT ,REPLICATION SLAVE , REPLICATION CLIENT ON *.* TO maxwell@'%';
3.2 修改maxwell配置文件
--復制配置文件 在maxwell根目錄下
cp config.properties.example config.properties
--修改配置文件
vim config.properties
producer=kafka
#用,分隔
kafka.bootstrap.servers=Ava01:9092,Ava02:9092,Ava03:9092
#需要添加
kafka_topic=topic名字 發送到kafka的哪個topic #ods_base_db_m
# mysql login info
host=Ava01
user=maxwell
password=123456
#需要添加 后續初始化會用
client_id=maxwell_1
3.3 maxwell發送數據到消息隊列的分區方式
--發送數據的分區方式是可選的
--可以對數據進行分區,解決數據傾斜問題,
--默認還是輸出到指定Kafka主題的一個kafka分區,因為多個分區並行可能會打亂binlog的順序
--如果要提高並行度,首先設置kafka的分區數>1,然后設置producer_partition_by屬性
--database是指將database名字當做key 同一個數據庫的所有表都會分配到同一個分區
--table table名字作為key 同一張表的數據發送到同一個分區
--primary_key 表中的主鍵為key 數據分配是最均勻的 但是可能會亂序
--transaction_id 事務id
--column 列名
--一般情況下 業務數據要保持消費順序的話 采用 database+table 為key 將同一個庫下的同一張表的數據發送到一個分區 因為kafka只能保證統一分區消費有序
#producer_partition_by=database # [database, table, primary_key, transaction_id, column]
3.4 啟動 需要指定配置文件
./maxwell --config /opt/module/maxwell-1.25.0/config.propertie
3.5 maxwell發送數據 測試
#啟動消費者
kafka-console-consumer.sh --bootstrap-server Ava01:9092 --topic maxwell_test
在mysql配置文件里指定哪個庫生成 binlog 就在那個庫里執行一個update或者insert測試一下 觀察消費者消費到的數據
4. Maxwell發送的數據
4.1 數據條數:
一條sql影響了N行, maxwell會發送N條消息 也就是會有 N個json 比如一個update語句影響了2條 maxwell會產生兩條json
4.2 數據格式
DML
{
"database": "gmall_rt",
"table": "cart_info",
"type": "insert",
"ts": 1631706456,
"xid": 7886,
"xoffset": 13987,
"data": {
"id": 148278,
"user_id": "2059",
"sku_id": 13,
"cart_price": 4188,
"sku_num": 1,
"img_url": "http://47.93.148.192:8080/group1/M00/00/02/rBHu8l-sklaALrngAAHGDqdpFtU741.jpg",
"sku_name": "華為 HUAWEI P40 麒麟990 5G SoC芯片 5000萬超感知徠卡三攝 30倍數字變焦 6GB+128GB亮黑色全網通5G手機",
"is_checked": null,
"create_time": "2021-09-15 19:47:33",
"operate_time": null,
"is_ordered": 0,
"order_time": null,
"source_type": "2401",
"source_id": null
}
}
DDL
{
"type": "table-create",
"database": "gmall_rt",
"table": "user_info",
"def": {
"database": "gmall_rt",
"charset": "utf8",
"table": "user_info",
"columns": [
{
"type": "int",
"name": "id",
"signed": true
},
{
"type": "varchar",
"name": "name",
"charset": "utf8"
}
],
"primary-key": [ ]
},
"ts": 1631343066000,
"sql": "create table user_info(
id int,
`name` varchar(25)
)"
}
4.3 maxwell的元數據
maxwell的元數據庫的position表,記錄讀取的binlog文件和位置
支持斷點還原
4.5 HA
maxwell一般不支持高可用,但是 較新版本,已經添加了 ha的功能,但是還在實驗中
4.6 maxwell初始化功能
maxwell支持歷史數據同步功能,也就是將沒有生成binlog之前的數據也讀取出來
支持 SELECT * FROM table 的方式進行全量數據初始化
因為本身記錄了position所以最大程度的保持了數據一致
--執行命令
bin/maxwell-bootstrap --user maxwell --password 123456 --host Ava01 --database gmall_rt --table user_info --client_id maxwell_1
#參數解釋
--user maxwell
數據庫分配的操作 maxwell數據庫的用戶名
--password 123456
數據庫分配的操作maxwell數據庫的密碼
--host
數據庫主機名
--database
數據庫名
--table
表名
--client_id
maxwell-bootstrap不具備將數據直接導入kafka或者hbase的能力,
通過--client_id指定將數據交給哪個maxwell進程處理,在maxwell的conf.properties中配置
5.maxwell的啟動腳本 指定配置
vim /home/otto/bin/maxwell.sh
/opt/module/maxwell-1.25.0/bin/maxwell --config /opt/module/maxwell-1.25.0/config.properties >/dev/null 2>&1 &
====================================================Canal=========================================================================
先說Canal和Maxwell的區別:
1、maxwell支持斷點還原、未來支持HA
canal支持HA(高可用),不支持斷點還原
2、數據格式
maxwell輕量級,只有一些核心關鍵的字段
canal啥都有,old、sql、.........
data字段:如果一條sql影響了N行數據
maxwell有N個json,每個都是json串 方便處理
canal只有一個json,所以是一個數組 處理時稍顯麻煩
3、maxwell支持歷史數據同步
canal不支持歷史數據同步,如果需要實現的話 可以新建一張臨時表,將需要同步的數據插入臨時表,使用canal監聽臨時表(數據重放、回溯)
4. 數字類型
當原始數據是數字類型時,maxwell會尊重原始數據的類型不增加雙引,變為字符串。canal一律轉換為字符串。
5.帶原始數據字段定義
canal數據中會帶入表結構。maxwell更簡潔。
1.Canal
canal是用java開發的基於數據庫增量日志解析,提供增量數據訂閱&消費的中間件。
目前,canal主要支持了MySQL的binlog解析,解析完成后才利用canal client來處理獲得的相關數據。(數據庫同步需要阿里的otter中間件,基於canal)。
1.1 Canal的使用場景:
(1) 原始場景: 阿里otter中間件的一部分
otter是阿里用於進行異地數據庫之間的同步框架,canal是其中一部分。
(2) 常見場景1:更新緩存
(3) 常見場景2:抓取業務數據新增變化表,用於制作拉鏈表。
(4) 常見場景3:抓取業務表的新增變化數據,用於制作實時統計(我們就是這種場景)
1.2 Canal的工作原理
和Maxwell類似 都是假裝skave讀取binlog
1.3 Canal的架構
1.4 安裝配置
#給Canal也創建一個Mysql用戶
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY '密碼' ;
#注:Canal的壓縮包解壓之后是散的 所以需要提前創建目錄
#解壓之后
cd /opt/module/canal/conf
vim canal.properties
注: canal的端口號默認是11111
# 修改canal的輸出model,默認tcp,改為輸出到kafka
# tcp, kafka, RocketMQ
canal.serverMode = kafka
#消息隊列的配置項
##################################################
######### MQ #############
##################################################
canal.mq.servers = Ava01:9092,Ava02:9092,Ava03:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =
#創建多個實例:
#一個canal服務中可以有多個instance,conf/下的每一個example即是一個實例,
#每個實例下面都有獨立的配置文件。默認只有一個實例example,如果需要多個實例處理不同的MySQL數據的話,直接拷貝出多個example,
#並對其重新命名,命名和配置文件中指定的名稱一致,然后修改canal.properties中的canal.destinations=實例1,實例2,實例3。
canal.destinations = example
#只讀取一個MySQL數據,所以只有一個實例,這個實例的配置文件在conf/example目錄下
cd /opt/module/canal/conf/example
vim instance.properties
#配置數據庫地址和用戶名密碼
canal.instance.master.address=Ava01:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=123456
# mq config
# 修改輸出到Kafka的主題以及分區數
canal.mq.topic=canal_test
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
注意:默認還是輸出到指定Kafka主題的一個kafka分區,因為多個分區並行可能會打亂binlog的順序
如果要提高並行度,首先設置kafka的分區數>1,然后設置canal.mq.partitionHash屬性
2.Canal測試 (和上面maxwell測試一樣)
3.CanalHA模式
這種zookeeper為觀察者監控的模式,只能實現高可用,而不是負載均衡.
即同一時點只有一個canal-server節點能夠監控某個數據源,只要這個節點能夠正常工作,
那么其他監控這個數據源的canal-server只能做stand-by,直到工作節點停掉,其他canal-server節點才能搶占。
因為有一個stand-by也要占用資源,同時canal傳輸數據宕機的情況也比較少,所以好多企業是不配置canal的高可用的。
4.Canal數據格式
#ddl
{
"data": null,
"database": "gmall_rt",
"es": 1631347983000,
"id": 2,
"isDdl": true,
"mysqlType": null,
"old": null,
"pkNames": null,
"sql": "create table test_log (id VARCHAR(5) ,name VARCHAR(20))",
"sqlType": null,
"table": "test_log",
"ts": 1631347983479,
"type": "CREATE"
}
#影響多條是數組 處理確實不如maxwell方便
{
"data": [
{
"id": "1",
"name": "zs"
},
{
"id": "2",
"name": "xf"
}
],
"database": "gmall_rt",
"es": 1631348033000,
"id": 3,
"isDdl": false,
"mysqlType": {
"id": "VARCHAR(5)",
"name": "VARCHAR(20)"
},
"old": null,
"pkNames": null,
"sql": "",
"sqlType": {
"id": 12,
"name": 12
},
"table": "test_log",
"ts": 1631348033643,
"type": "INSERT"
}