1 概述
MySQL 的用戶群體很大,為了能夠增強數據的實時性,很多解決方案會利用 binlog 將數據寫入到 ClickHouse。為了能夠監聽 binlog 事件,我們需要用到類似 canal 這樣的第三方中間件,這無疑增加了系統的復雜度。
ClickHouse 20.8.2.3 版本新增加了 MaterializeMySQL 的 database 引擎,該 database 能映 射 到 MySQL 中 的 某 個 database , 並 自 動 在 ClickHouse 中 創 建 對 應 的ReplacingMergeTree。ClickHouse 服務
做為 MySQL 副本,讀取 Binlog 並執行 DDL 和 DML 請求,實現了基於 MySQL Binlog 機制的業務數據庫實時同步功能。
1.1 特點
(1)MaterializeMySQL 同時支持
全量和
增量同步,在 database 創建之初會全量同步MySQL 中的表和數據,之后則會通過 binlog 進行增量同步。
(2)MaterializeMySQL database 為其所創建的每張 ReplacingMergeTree 自動增加了_sign 和 _version 字段。
其中,_version 用作 ReplacingMergeTree 的 ver 版本參數,每當監聽到 insert、update和 delete 事件時,在 databse 內全局自增。而 _sign 則用於標記是否被刪除,取值 1 或者 -1。
目前 MaterializeMySQL 支持如下幾種 binlog 事件:
➢MYSQL_WRITE_ROWS_EVENT:_sign = 1,_version ++
➢MYSQL_DELETE_ROWS_EVENT:_sign = -1,_version ++
➢MYSQL_UPDATE_ROWS_EVENT:新數據 _sign = 1
➢MYSQL_QUERY_EVENT: 支持 CREATE TABLE 、DROP TABLE 、RENAME TABLE 等。
1.2 使用細則
(1)DDL 查詢
MySQL DDL 查詢被轉換成相應的 ClickHouse DDL 查詢(ALTER, CREATE, DROP, RENAME)。如果 ClickHouse 不能解析某些 DDL 查詢,該查詢將被忽略。
(2)數據復制
MaterializeMySQL 不支持直接插入、刪除和更新查詢,而是將 DDL 語句進行相應轉換:
MySQL INSERT 查詢被轉換為 INSERT with _sign=1。
MySQL DELETE 查詢被轉換為 INSERT with _sign=-1。
MySQL UPDATE 查詢被轉換成 INSERT with _sign=1 和 INSERT with _sign=-1。
(3)SELECT 查詢
如果在 SELECT 查詢中沒有指定_version,則使用 FINAL 修飾符,返回_version 的最大值對應的數據,即最新版本的數據。
如果在 SELECT 查詢中沒有指定_sign,則默認使用 WHERE _sign=1,即返回未刪除狀態(_sign=1)的數據。
(4)索引轉換
ClickHouse 數據庫表會自動將 MySQL 主鍵和索引子句轉換為 ORDER BY 元組。
ClickHouse 只有一個物理順序,由 ORDER BY 子句決定。如果需要創建新的物理順序,請使用物化視圖。
2 案例實操
2.1 MySQL 開啟 binlog 和 GTID 模式
(1)確保 MySQL 開啟了 binlog 功能,且格式為
ROW
打開/etc/my.cnf,在[mysqld]下添加:
server-id=1 log-bin=mysql-bin binlog_format=ROW
(2)開啟 GTID 模式
如果如果 clickhouse 使用的是 20.8 prestable 之后發布的版本,那么 MySQL 還需要配置開啟 GTID 模式, 這種方式在 mysql 主從模式下可以確保數據同步的一致性(主從切換時)。
gtid-mode=on
enforce-gtid-consistency=1 # 設置為主從強一致性
log-slave-updates=1 # 記錄日志
GTID 是 MySQL 復制增強版,從 MySQL 5.6 版本開始支持,目前已經是 MySQL 主流復制模式。它為每個 event 分配一個全局唯一 ID 和序號,我們可以不用關心 MySQL 集群主從拓撲結構,直接告知MySQL 這個 GTID 即可。
(3)重啟 MySQL
sudo systemctl restart mysqld
2.2 准備 MySQL 表和數據
(1)在 MySQL 中創建數據表並寫入數據
CREATE DATABASE testck; CREATE TABLE `testck`.`t_organization` ( `id` int(11) NOT NULL AUTO_INCREMENT, `code` int NOT NULL, `name` text DEFAULT NULL, `updatetime` datetime DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY (`code`) ) ENGINE=InnoDB; INSERT INTO testck.t_organization (code, name,updatetime) VALUES(1000,'Realinsight',NOW()); INSERT INTO testck.t_organization (code, name,updatetime) VALUES(1001, 'Realindex',NOW()); INSERT INTO testck.t_organization (code, name,updatetime) VALUES(1002,'EDT',NOW());
(2)創建第二張表
CREATE TABLE `testck`.`t_user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `code` int, PRIMARY KEY (`id`) ) ENGINE=InnoDB; INSERT INTO testck.t_user (code) VALUES(1);
2.3 開啟 ClickHouse 物化引擎
set allow_experimental_database_materialize_mysql=1;
2.4 創建復制管道
(1)ClickHouse 中創建 MaterializeMySQL 數據庫
CREATE DATABASE test_binlog ENGINE = MaterializeMySQL('hadoop1:3306','testck','root','000000');
其中 4 個參數分別是 MySQL 地址、databse、username 和 password。
(2)查看 ClickHouse 的數據
use test_binlog;
show tables;
select * from t_organization;
select * from t_user;
2.5 修改數據
(1)在 MySQL 中修改數據:
update t_organization set name = CONCAT(name,'-v1') where id = 1
(2)查看 clickhouse 日志可以看到 binlog 監聽事件,查詢 clickhouse
select * from t_organization;
2.6 刪除數據
(1)MySQL 刪除數據:
DELETE FROM t_organization where id = 2;
(2)ClicKHouse,日志有 DeleteRows 的 binlog 監聽事件,查看數據:
select * from t_organization;
(3)在剛才的查詢中增加 _sign 和 _version 虛擬字段
select *,_sign,_version from t_organization order by _sign desc,_version desc;
在查詢時,對於已經被刪除的數據,_sign=-1,ClickHouse 會自動重寫 SQL,將 _sign = -1 的數據過濾掉;
對於修改的數據,則自動重寫 SQL,為其增加 FINAL 修飾符。
select * from t_organization 等同於 select * from t_organization final where _sign = 1
2.7 刪除表
(1)在 mysql 執行刪除表
drop table t_user;
(2)此時在 clickhouse 處會同步刪除對應表,如果查詢會報錯
show tables; select * from t_user; DB::Exception: Table scene_mms.scene doesn't exist..
(3)mysql 新建表,clickhouse 可以查詢到
CREATE TABLE `testck`.`t_user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `code` int, PRIMARY KEY (`id`) ) ENGINE=InnoDB; INSERT INTO testck.t_user (code) VALUES(1); #ClickHouse 查詢 show tables; select * from t_user;