clickhouse通過MaterializeMySQL庫引擎實現mysql實時同步


一.概述

  為了能夠增強數據的實時性,利用 binlog 將數據寫入到 ClickHouse。然而為了能夠監聽 binlog 事件,需要用到類似 canal 這樣的第三

方中間件,這無疑增加了系統的復雜度。

  ClickHouse 20.8.2.3 版本新增加了 MaterializeMySQL 的 database 引擎,該 database 能映 射 到 MySQL 中 的 某 個 database , 並 自 動 在 ClickHouse 中 創 建 對 應 的

ReplacingMergeTree。ClickHouse 服務做為 MySQL 副本,讀取 Binlog 並執行 DDL 和 DML 請求,實現了基於 MySQL Binlog 機制的業務數據庫實時同步功能。

1.特點

  (1)MaterializeMySQL 同時支持全量和增量同步,在 database 創建之初會全量同步MySQL 中的表和數據,之后則會通過 binlog 進行增量同步。

  (2)MaterializeMySQL database 為其所創建的每張 ReplacingMergeTree 自動增加了_sign 和 _version 字段。

  其中,_version 用作 ReplacingMergeTree 的 ver 版本參數,每當監聽到 insert、update和 delete 事件時,在 databse 內全局自增。而 _sign 則用於標記是否被刪除,取值 1 或

者 -1。

  目前 MaterializeMySQL 支持如下幾種 binlog 事件:

  ➢MYSQL_WRITE_ROWS_EVENT:_sign = 1,_version ++

  ➢MYSQL_DELETE_ROWS_EVENT:_sign = -1,_version ++

  ➢MYSQL_UPDATE_ROWS_EVENT:新數據 _sign = 1

  ➢MYSQL_QUERY_EVENT: 支持 CREATE TABLE 、DROP TABLE 、RENAME TABLE 等。

  即支持mysql 5.6/5.7/8.0版本數據庫,兼容insert,update,delete,alter,create,drop,truncate等大部分DDL操作。

2. 使用細則

(1)DDL 查詢

  MySQL DDL 查詢被轉換成相應的 ClickHouse DDL 查詢(ALTER, CREATE, DROP, RENAME)。如果 ClickHouse 不能解析某些 DDL 查詢,該查詢將被忽略。

(2)數據復制

  MaterializeMySQL 不支持直接插入、刪除和更新查詢,而是將 DDL 語句進行相應轉換:

  ①MySQL INSERT 查詢被轉換為 INSERT with _sign=1。

  ②MySQL DELETE 查詢被轉換為 INSERT with _sign=-1。

  ③MySQL UPDATE 查詢被轉換成 INSERT with _sign=1 和 INSERT with _sign=-1。

  即使用MaterializedMySQL數據庫引擎時,ReplacingMergeTree表與虛擬_sign_version列一起使用。

  • _version— 交易計數器。鍵入UInt64

  • _sign— 刪除標記。鍵入Int8可能的值:

    • 1— 未刪除行,

    • -1— 行被刪除。

(3)SELECT 查詢

  如果在 SELECT 查詢中沒有指定_version,則使用 FINAL 修飾符,返回_version 的最大值對應的數據,即最新版本的數據。

  如果在 SELECT 查詢中沒有指定_sign,則默認使用 WHERE _sign=1,即返回未刪除狀態(_sign=1)的數據。

(4)索引轉換

  ClickHouse 數據庫表會自動將 MySQL 主鍵和索引子句轉換為 ORDER BY 元組。

  ClickHouse 只有一個物理順序,由 ORDER BY 子句決定。如果需要創建新的物理順序,請使用物化視圖。

  • _sign=-1沒有從表中物理刪除的行。

  • UPDATE/DELETE引擎不支持級聯查詢MaterializedMySQL,因為它們在 MySQL 二進制日志中不可見。

  • 復制很容易被破壞。

  • 禁止對數據庫和表進行手動操作。

  • MaterializedMySQL受optimize_on_insert 設置影響。MaterializedMySQL當 MySQL 服務器中的表發生變化時,數據會合並到數據庫中的相應表中。

(5)類型轉換

MySQL ClickHouse
TINY Int8
SHORT Int16
INT24 Int32
LONG UInt32
LONGLONG UInt64
FLOAT Float32
DOUBLE Float64
DECIMAL, NEWDECIMAL Decimal
DATE, NEWDATE Date32
DATETIME, TIMESTAMP DateTime
DATETIME2, TIMESTAMP2 DateTime64
YEAR UInt16
TIME Int64
ENUM Enum
STRING String
VARCHAR, VAR_STRING String
BLOB String
GEOMETRY String
BINARY FixedString
BIT UInt64
SET UInt64

(6)創建語句以及配置參數

CREATE DATABASE [IF NOT EXISTS] db_name [ON CLUSTER cluster]
ENGINE = MaterializedMySQL('host:port', ['database' | database], 'user', 'password') [SETTINGS ...]
[TABLE OVERRIDE table1 (...), TABLE OVERRIDE table2 (...)]

  引擎參數

  • host:port— MySQL 服務器端點。

  • database— MySQL 數據庫名稱。

  • user— MySQL 用戶。

  • password- 用戶密碼。

  引擎設置

  • max_rows_in_buffer— 允許數據在內存中緩存的最大行數(對於單個表且緩存數據無法查詢)。當超過這個數字時,數據將被物化。默認值:65505

  • max_bytes_in_buffer— 允許數據在內存中緩存的最大字節數(對於單個表且緩存數據無法查詢)。當超過這個數字時,數據將被物化。默認值:1048576

  • max_rows_in_buffers— 允許數據在內存中緩存的最大行數(對於數據庫和緩存數據無法查詢)。當超過這個數字時,數據將被物化。默認值:65505

  • max_bytes_in_buffers— 允許數據在內存中緩存的最大字節數(對於數據庫和緩存數據無法查詢)。當超過這個數字時,數據將被物化。默認值:1048576

  • max_flush_data_time— 允許數據在內存中緩存的最大毫秒數(對於數據庫和緩存數據無法查詢)。當超過這個時間時,數據將被物化。默認值:1000

  • max_wait_time_when_mysql_unavailable— MySQL 不可用時的重試間隔(毫秒)。負值禁用重試。默認值:1000

  • allows_query_when_mysql_lost— 允許在 MySQL 丟失時查詢物化表。默認值:( 0false

  • materialized_mysql_tables_list— 以逗號分隔的 mysql 數據庫表列表,將由 MaterializedMySQL 數據庫引擎復制。默認值:空列表 — 表示將復制整個表。

(7)表覆蓋

  表覆蓋可用於自定義 ClickHouse DDL 查詢,允許您為應用程序進行架構優化。這對於控制分區特別有用,這對 MaterializedMySQL 的整體性能很重要。

  這些是您可以對 MaterializedMySQL 的表覆蓋進行的模式轉換操作:

  • 修改列類型。必須與原始類型兼容,否則復制將失敗。例如,您可以將 UInt32 列修改為 UInt64,但不能將 String 列修改為 Array(String)。
  • 修改列 TTL
  • 修改列壓縮編解碼器
  • 添加別名列
  • 添加跳過索引
  • 添加投影請注意,使用時會禁用投影優化SELECT ... FINAL(MaterializedMySQL 默認會這樣做),因此它們的實用性在這里受到限制。
  • 修改PARTITION BY
  • 修改ORDER BY
  • 修改主鍵
  • 添加樣品
  • 添加表 TTL
CREATE DATABASE db_name ENGINE = MaterializedMySQL(...)
[SETTINGS ...]
[TABLE OVERRIDE table_name (
    [COLUMNS (
        [col_name [datatype] [ALIAS expr] [CODEC(...)] [TTL expr], ...]
        [INDEX index_name expr TYPE indextype[(...)] GRANULARITY val, ...]
        [PROJECTION projection_name (SELECT <COLUMN LIST EXPR> [GROUP BY] [ORDER BY]), ...]
    )]
    [ORDER BY expr]
    [PRIMARY KEY expr]
    [PARTITION BY expr]
    [SAMPLE BY expr]
    [TTL expr]
), ...]

  示例:

CREATE DATABASE db_name ENGINE = MaterializedMySQL(...)
TABLE OVERRIDE table1 (
    COLUMNS (
        userid UUID,
        category LowCardinality(String),
        timestamp DateTime CODEC(Delta, Default)
    )
    PARTITION BY toYear(timestamp)
),
TABLE OVERRIDE table2 (
    COLUMNS (
        client_ip String TTL created + INTERVAL 72 HOUR
    )
    SAMPLE BY ip_hash
)

  COLUMNS名單很稀疏根據指定修改現有列,添加額外的 ALIAS 列。無法添加普通或 MATERIALIZED 列。具有不同類型的修改列必須可以從原始類型分配。目前在CREATE DATABASE執行查詢時沒有驗證此問題或類似問題,因此需要格外小心。

  可以為尚不存在的表指定覆蓋。

  !!!警告如果不小心使用,很容易使用表覆蓋來破壞復制。

(8)注意事項

  ①MySQL中的每個表都應包含PRIMARY KEY.

  ②表的復制,那些包含ENUM字段值超出范圍(在ENUM簽名中指定)的行將不起作用。

  ③如果 ClickHouse 無法解析某些 DDL 查詢,則忽略該查詢。

二.案例實操

1. MySQL 開啟 binlog 和 GTID 模式

(1)確保 MySQL 開啟了 binlog 功能,且格式為 ROW

  打開/etc/my.cnf,在[mysqld]下添加:

server-id=1 
log-bin=mysql-bin
binlog_format=ROW

(2)開啟 GTID 模式

  如果 clickhouse 使用的是 20.8 prestable 之后發布的版本,那么 MySQL 還需要配置開啟 GTID 模式, 這種方式在 mysql 主從模式下可以確保數據同步的一致性(主從切換時)。

  gtid-mode=on

  enforce-gtid-consistency=1 # 設置為主從強一致性

  log-slave-updates=1 # 記錄日志

  GTID 是 MySQL 復制增強版,從 MySQL 5.6 版本開始支持,目前已經是 MySQL 主流。復制模式。它為每個 event 分配一個全局唯一 ID 和序號,我們可以不用關心 MySQL 集群主從拓撲結構,直接告知 MySQL 這個 GTID 即可。

  查詢以下語句進行驗證:

show variables like '%gtid_mode%';
show variables like '%enforce_gtid_consistency%';
show variables like '%binlog_format%';
gtid_mode    ON
enforce_gtid_consistency    ON
binlog_format    ROW

  如以上值,則代表可以進行同步。

(3)重啟 MySQL

  sudo systemctl restart mysqld

(4)用戶

  同步用戶建議最高級用戶(擁有all權限)或者用戶有RELOAD, REPLICATION SLAVE, REPLICATION CLIENT相關權限。 

  如為普通用戶【5.7版本mysql】需執行 {必須 *.* 才是服務服務器權限}

grant select,reload,replication slave,replication client on *.* to syncdata@'%' identified  by "123";
flush privileges;--刷新權限

 

2. 准備 MySQL 表和數據

(1)在 MySQL 中創建數據表並寫入數據

CREATE DATABASE testck;
CREATE TABLE `testck`.`t_organization` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `code` int NOT NULL,
 `name` text DEFAULT NULL,
 `updatetime` datetime DEFAULT NULL,
 PRIMARY KEY (`id`),
 UNIQUE KEY (`code`)
) ENGINE=InnoDB;

INSERT INTO testck.t_organization (code, name,updatetime) 
VALUES(1000,'Realinsight',NOW());
INSERT INTO testck.t_organization (code, name,updatetime) 
VALUES(1001, 'Realindex',NOW());
INSERT INTO testck.t_organization (code, name,updatetime) 
VALUES(1002,'EDT',NOW());

 

(2)創建第二張表

CREATE TABLE `testck`.`t_user` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `code` int,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB;


INSERT INTO testck.t_user (code) VALUES(1);

 

3. 開啟 ClickHouse 物化引擎

  注意:以下語句如果運行設置找不到,需要先在users.xml開啟調用窗口函數使用狀態在/etc/clickhouse-server/users.xml 配置文件中加入<allow_experimental_window_functions>1</allow_experimental_window_functions>

<?xml version="1.0"?>
<yandex>
<!-- Profiles of settings. -->
<profiles>
<!-- Default settings. -->
<default>
<!-- Maximum memory usage for processing single query, in bytes. -->
<max_memory_usage>10000000000</max_memory_usage>
 
<!-- How to choose between replicas during distributed query processing.
random - choose random replica from set of replicas with minimum number of errors
nearest_hostname - from set of replicas with minimum number of errors, choose replica
with minimum number of different symbols between replica's hostname and local hostname
(Hamming distance).
in_order - first live replica is chosen in specified order.
first_or_random - if first replica one has higher number of errors, pick a random one from replicas with minimum number of errors.
-->
<load_balancing>random</load_balancing>
<allow_experimental_window_functions>1</allow_experimental_window_functions>
</default>
 
<!-- Profile that allows only read queries. -->
<readonly>
<readonly>1</readonly>
</readonly>
</profiles>
......

  然后開啟 MaterializeMySQL 庫引擎操作權限,然后配置文件中的allow_experimental_window_functions 可以去除【高版本會提示過時,舊版本可能需要保留,看登錄的提示信息】

  注意每次登錄默認是關閉的,需要使用再操作打開。

set allow_experimental_database_materialize_mysql=1;

 

4. 創建復制管道

(1)ClickHouse 中創建 MaterializeMySQL 數據庫

CREATE DATABASE test_binlog ENGINE = MaterializeMySQL('hadoop1:3306','testck','root','000000');

 

  其中 4 個參數分別是 MySQL 地址、databse、username 和 password。

(2)查看 ClickHouse 的數據

use test_binlog;
show tables;
select * from t_organization;
select * from t_user;

 

5. 修改數據

(1)在 MySQL 中修改數據:

update t_organization set name = CONCAT(name,'-v1') where id = 1

 

(2)查看 clickhouse 日志可以看到 binlog 監聽事件,查詢 clickhouse

select * from t_organization;

 

6. 刪除數據

(1)MySQL 刪除數據:

DELETE FROM t_organization where id = 2;

 

(2)ClicKHouse,日志有 DeleteRows 的 binlog 監聽事件,查看數據:

select * from t_organization;

 

(3)在剛才的查詢中增加 _sign 和 _version 虛擬字段

select *,_sign,_version from t_organization order by _sign 
desc,_version desc;

 

  在查詢時,對於已經被刪除的數據,_sign=-1,ClickHouse 會自動重寫 SQL,將 _sign = -1 的數據過濾掉;

  對於修改的數據,則自動重寫 SQL,為其增加 FINAL 修飾符。

select * from t_organization
--等同於
select * from t_organization final where _sign = 1

 

7. 刪除表

(1)在 mysql 執行刪除表

drop table t_user;

 

(2)此時在 clickhouse 處會同步刪除對應表,如果查詢會報錯

show tables;
select * from t_user;
DB::Exception: Table scene_mms.scene doesn't exist..

 

(3)mysql 新建表,clickhouse 可以查詢到

CREATE TABLE `testck`.`t_user` (
 `id` int(11) NOT NULL AUTO_INCREMENT,
 `code` int,
 PRIMARY KEY (`id`)
) ENGINE=InnoDB;

INSERT INTO testck.t_user (code) VALUES(1);

#ClickHouse 查詢
show tables;
select * from t_user;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM