一.概述
為了能夠增強數據的實時性,利用 binlog 將數據寫入到 ClickHouse。然而為了能夠監聽 binlog 事件,需要用到類似 canal 這樣的第三
方中間件,這無疑增加了系統的復雜度。
ClickHouse 20.8.2.3 版本新增加了 MaterializeMySQL 的 database 引擎,該 database 能映 射 到 MySQL 中 的 某 個 database , 並 自 動 在 ClickHouse 中 創 建 對 應 的
ReplacingMergeTree。ClickHouse 服務做為 MySQL 副本,讀取 Binlog 並執行 DDL 和 DML 請求,實現了基於 MySQL Binlog 機制的業務數據庫實時同步功能。
1.特點
(1)MaterializeMySQL 同時支持全量和增量同步,在 database 創建之初會全量同步MySQL 中的表和數據,之后則會通過 binlog 進行增量同步。
(2)MaterializeMySQL database 為其所創建的每張 ReplacingMergeTree 自動增加了_sign 和 _version 字段。
其中,_version 用作 ReplacingMergeTree 的 ver 版本參數,每當監聽到 insert、update和 delete 事件時,在 databse 內全局自增。而 _sign 則用於標記是否被刪除,取值 1 或
者 -1。
目前 MaterializeMySQL 支持如下幾種 binlog 事件:
➢MYSQL_WRITE_ROWS_EVENT:_sign = 1,_version ++
➢MYSQL_DELETE_ROWS_EVENT:_sign = -1,_version ++
➢MYSQL_UPDATE_ROWS_EVENT:新數據 _sign = 1
➢MYSQL_QUERY_EVENT: 支持 CREATE TABLE 、DROP TABLE 、RENAME TABLE 等。
即支持mysql 5.6/5.7/8.0版本數據庫,兼容insert,update,delete,alter,create,drop,truncate等大部分DDL操作。
2. 使用細則
(1)DDL 查詢
MySQL DDL 查詢被轉換成相應的 ClickHouse DDL 查詢(ALTER, CREATE, DROP, RENAME)。如果 ClickHouse 不能解析某些 DDL 查詢,該查詢將被忽略。
(2)數據復制
MaterializeMySQL 不支持直接插入、刪除和更新查詢,而是將 DDL 語句進行相應轉換:
①MySQL INSERT 查詢被轉換為 INSERT with _sign=1。
②MySQL DELETE 查詢被轉換為 INSERT with _sign=-1。
③MySQL UPDATE 查詢被轉換成 INSERT with _sign=1 和 INSERT with _sign=-1。
即使用MaterializedMySQL
數據庫引擎時,ReplacingMergeTree表與虛擬_sign
和_version
列一起使用。
-
_version
— 交易計數器。鍵入UInt64。 -
_sign
— 刪除標記。鍵入Int8。可能的值:-
1
— 未刪除行, -
-1
— 行被刪除。
-
(3)SELECT 查詢
如果在 SELECT 查詢中沒有指定_version,則使用 FINAL 修飾符,返回_version 的最大值對應的數據,即最新版本的數據。
如果在 SELECT 查詢中沒有指定_sign,則默認使用 WHERE _sign=1,即返回未刪除狀態(_sign=1)的數據。
(4)索引轉換
ClickHouse 數據庫表會自動將 MySQL 主鍵和索引子句轉換為 ORDER BY 元組。
ClickHouse 只有一個物理順序,由 ORDER BY 子句決定。如果需要創建新的物理順序,請使用物化視圖。
-
_sign=-1
沒有從表中物理刪除的行。 -
UPDATE/DELETE
引擎不支持級聯查詢MaterializedMySQL
,因為它們在 MySQL 二進制日志中不可見。 -
復制很容易被破壞。
-
禁止對數據庫和表進行手動操作。
-
MaterializedMySQL
受optimize_on_insert 設置影響。MaterializedMySQL
當 MySQL 服務器中的表發生變化時,數據會合並到數據庫中的相應表中。
(5)類型轉換
MySQL | ClickHouse |
---|---|
TINY | Int8 |
SHORT | Int16 |
INT24 | Int32 |
LONG | UInt32 |
LONGLONG | UInt64 |
FLOAT | Float32 |
DOUBLE | Float64 |
DECIMAL, NEWDECIMAL | Decimal |
DATE, NEWDATE | Date32 |
DATETIME, TIMESTAMP | DateTime |
DATETIME2, TIMESTAMP2 | DateTime64 |
YEAR | UInt16 |
TIME | Int64 |
ENUM | Enum |
STRING | String |
VARCHAR, VAR_STRING | String |
BLOB | String |
GEOMETRY | String |
BINARY | FixedString |
BIT | UInt64 |
SET | UInt64 |
(6)創建語句以及配置參數
CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster] ENGINE = MaterializedMySQL('host:port', ['database' | database], 'user', 'password') [SETTINGS ...] [TABLE OVERRIDE table1 (...), TABLE OVERRIDE table2 (...)]
引擎參數
-
host:port
— MySQL 服務器端點。 -
database
— MySQL 數據庫名稱。 -
user
— MySQL 用戶。 -
password
- 用戶密碼。
引擎設置
-
max_rows_in_buffer
— 允許數據在內存中緩存的最大行數(對於單個表且緩存數據無法查詢)。當超過這個數字時,數據將被物化。默認值:65505
。 -
max_bytes_in_buffer
— 允許數據在內存中緩存的最大字節數(對於單個表且緩存數據無法查詢)。當超過這個數字時,數據將被物化。默認值:1048576
。 -
max_rows_in_buffers
— 允許數據在內存中緩存的最大行數(對於數據庫和緩存數據無法查詢)。當超過這個數字時,數據將被物化。默認值:65505
。 -
max_bytes_in_buffers
— 允許數據在內存中緩存的最大字節數(對於數據庫和緩存數據無法查詢)。當超過這個數字時,數據將被物化。默認值:1048576
。 -
max_flush_data_time
— 允許數據在內存中緩存的最大毫秒數(對於數據庫和緩存數據無法查詢)。當超過這個時間時,數據將被物化。默認值:1000
。 -
max_wait_time_when_mysql_unavailable
— MySQL 不可用時的重試間隔(毫秒)。負值禁用重試。默認值:1000
。 -
allows_query_when_mysql_lost
— 允許在 MySQL 丟失時查詢物化表。默認值:(0
)false
。 -
materialized_mysql_tables_list
— 以逗號分隔的 mysql 數據庫表列表,將由 MaterializedMySQL 數據庫引擎復制。默認值:空列表 — 表示將復制整個表。
(7)表覆蓋
表覆蓋可用於自定義 ClickHouse DDL 查詢,允許您為應用程序進行架構優化。這對於控制分區特別有用,這對 MaterializedMySQL 的整體性能很重要。
這些是您可以對 MaterializedMySQL 的表覆蓋進行的模式轉換操作:
- 修改列類型。必須與原始類型兼容,否則復制將失敗。例如,您可以將 UInt32 列修改為 UInt64,但不能將 String 列修改為 Array(String)。
- 修改列 TTL。
- 修改列壓縮編解碼器。
- 添加別名列。
- 添加跳過索引
- 添加投影。請注意,使用時會禁用投影優化
SELECT ... FINAL
(MaterializedMySQL 默認會這樣做),因此它們的實用性在這里受到限制。 - 修改PARTITION BY
- 修改ORDER BY
- 修改主鍵
- 添加樣品
- 添加表 TTL
CREATE DATABASE db_name ENGINE = MaterializedMySQL(...) [SETTINGS ...] [TABLE OVERRIDE table_name ( [COLUMNS ( [col_name [datatype] [ALIAS expr] [CODEC(...)] [TTL expr], ...] [INDEX index_name expr TYPE indextype[(...)] GRANULARITY val, ...] [PROJECTION projection_name (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY]), ...] )] [ORDER BY expr] [PRIMARY KEY expr] [PARTITION BY expr] [SAMPLE BY expr] [TTL expr] ), ...]
示例:
CREATE DATABASE db_name ENGINE = MaterializedMySQL(...) TABLE OVERRIDE table1 ( COLUMNS ( userid UUID, category LowCardinality(String), timestamp DateTime CODEC(Delta, Default) ) PARTITION BY toYear(timestamp) ), TABLE OVERRIDE table2 ( COLUMNS ( client_ip String TTL created + INTERVAL 72 HOUR ) SAMPLE BY ip_hash )
COLUMNS
名單很稀疏;根據指定修改現有列,添加額外的 ALIAS 列。無法添加普通或 MATERIALIZED 列。具有不同類型的修改列必須可以從原始類型分配。目前在CREATE DATABASE
執行查詢時沒有驗證此問題或類似問題,因此需要格外小心。
可以為尚不存在的表指定覆蓋。
!!!警告如果不小心使用,很容易使用表覆蓋來破壞復制。
(8)注意事項
①MySQL中的每個表都應包含PRIMARY KEY
.
②表的復制,那些包含ENUM
字段值超出范圍(在ENUM
簽名中指定)的行將不起作用。
③如果 ClickHouse 無法解析某些 DDL 查詢,則忽略該查詢。
二.案例實操
1. MySQL 開啟 binlog 和 GTID 模式
(1)確保 MySQL 開啟了 binlog 功能,且格式為 ROW
打開/etc/my.cnf,在[mysqld]下添加:
server-id=1 log-bin=mysql-bin binlog_format=ROW
(2)開啟 GTID 模式
如果 clickhouse 使用的是 20.8 prestable 之后發布的版本,那么 MySQL 還需要配置開啟 GTID 模式, 這種方式在 mysql 主從模式下可以確保數據同步的一致性(主從切換時)。
gtid-mode=on
enforce-gtid-consistency=1 # 設置為主從強一致性
log-slave-updates=1 # 記錄日志
GTID 是 MySQL 復制增強版,從 MySQL 5.6 版本開始支持,目前已經是 MySQL 主流。復制模式。它為每個 event 分配一個全局唯一 ID 和序號,我們可以不用關心 MySQL 集群主從拓撲結構,直接告知 MySQL 這個 GTID 即可。
查詢以下語句進行驗證:
show variables like '%gtid_mode%'; show variables like '%enforce_gtid_consistency%'; show variables like '%binlog_format%';
gtid_mode ON enforce_gtid_consistency ON binlog_format ROW
如以上值,則代表可以進行同步。
(3)重啟 MySQL
sudo systemctl restart mysqld
(4)用戶
同步用戶建議最高級用戶(擁有all權限)或者用戶有RELOAD, REPLICATION SLAVE, REPLICATION CLIENT相關權限。
如為普通用戶【5.7版本mysql】需執行 {必須 *.* 才是服務服務器權限}
grant select,reload,replication slave,replication client on *.* to syncdata@'%' identified by "123";
flush privileges;--刷新權限
2. 准備 MySQL 表和數據
(1)在 MySQL 中創建數據表並寫入數據
CREATE DATABASE testck; CREATE TABLE `testck`.`t_organization` ( `id` int(11) NOT NULL AUTO_INCREMENT, `code` int NOT NULL, `name` text DEFAULT NULL, `updatetime` datetime DEFAULT NULL, PRIMARY KEY (`id`), UNIQUE KEY (`code`) ) ENGINE=InnoDB; INSERT INTO testck.t_organization (code, name,updatetime) VALUES(1000,'Realinsight',NOW()); INSERT INTO testck.t_organization (code, name,updatetime) VALUES(1001, 'Realindex',NOW()); INSERT INTO testck.t_organization (code, name,updatetime) VALUES(1002,'EDT',NOW());
(2)創建第二張表
CREATE TABLE `testck`.`t_user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `code` int, PRIMARY KEY (`id`) ) ENGINE=InnoDB; INSERT INTO testck.t_user (code) VALUES(1);
3. 開啟 ClickHouse 物化引擎
注意:以下語句如果運行設置找不到,需要先在users.xml開啟調用窗口函數使用狀態在/etc/clickhouse-server/users.xml 配置文件中加入<allow_experimental_window_functions>1</allow_experimental_window_functions>
<?xml version="1.0"?> <yandex> <!-- Profiles of settings. --> <profiles> <!-- Default settings. --> <default> <!-- Maximum memory usage for processing single query, in bytes. --> <max_memory_usage>10000000000</max_memory_usage> <!-- How to choose between replicas during distributed query processing. random - choose random replica from set of replicas with minimum number of errors nearest_hostname - from set of replicas with minimum number of errors, choose replica with minimum number of different symbols between replica's hostname and local hostname (Hamming distance). in_order - first live replica is chosen in specified order. first_or_random - if first replica one has higher number of errors, pick a random one from replicas with minimum number of errors. --> <load_balancing>random</load_balancing> <allow_experimental_window_functions>1</allow_experimental_window_functions> </default> <!-- Profile that allows only read queries. --> <readonly> <readonly>1</readonly> </readonly> </profiles> ......
然后開啟 MaterializeMySQL 庫引擎操作權限,然后配置文件中的allow_experimental_window_functions 可以去除【高版本會提示過時,舊版本可能需要保留,看登錄的提示信息】
注意每次登錄默認是關閉的,需要使用再操作打開。
set allow_experimental_database_materialize_mysql=1;
4. 創建復制管道
(1)ClickHouse 中創建 MaterializeMySQL 數據庫
CREATE DATABASE test_binlog ENGINE = MaterializeMySQL('hadoop1:3306','testck','root','000000');
其中 4 個參數分別是 MySQL 地址、databse、username 和 password。
(2)查看 ClickHouse 的數據
use test_binlog; show tables; select * from t_organization; select * from t_user;
5. 修改數據
(1)在 MySQL 中修改數據:
update t_organization set name = CONCAT(name,'-v1') where id = 1
(2)查看 clickhouse 日志可以看到 binlog 監聽事件,查詢 clickhouse
select * from t_organization;
6. 刪除數據
(1)MySQL 刪除數據:
DELETE FROM t_organization where id = 2;
(2)ClicKHouse,日志有 DeleteRows 的 binlog 監聽事件,查看數據:
select * from t_organization;
(3)在剛才的查詢中增加 _sign 和 _version 虛擬字段
select *,_sign,_version from t_organization order by _sign desc,_version desc;
在查詢時,對於已經被刪除的數據,_sign=-1,ClickHouse 會自動重寫 SQL,將 _sign = -1 的數據過濾掉;
對於修改的數據,則自動重寫 SQL,為其增加 FINAL 修飾符。
select * from t_organization --等同於 select * from t_organization final where _sign = 1
7. 刪除表
(1)在 mysql 執行刪除表
drop table t_user;
(2)此時在 clickhouse 處會同步刪除對應表,如果查詢會報錯
show tables; select * from t_user; DB::Exception: Table scene_mms.scene doesn't exist..
(3)mysql 新建表,clickhouse 可以查詢到
CREATE TABLE `testck`.`t_user` ( `id` int(11) NOT NULL AUTO_INCREMENT, `code` int, PRIMARY KEY (`id`) ) ENGINE=InnoDB; INSERT INTO testck.t_user (code) VALUES(1); #ClickHouse 查詢 show tables; select * from t_user;