作者 史鵬宙 CSIG雲與智慧產業事業群研發工程師
ClickHouse作為OLAP分析引擎已經被廣泛使用,數據的導入導出是用戶面臨的第一個問題。由於ClickHouse本身無法很好地支持單條大批量的寫入,因此在實時同步數據方面需要借助其他服務協助。本文給出一種結合Canal+Kafka的方案,並且給出在多個MySQL實例分庫分表的場景下,如何將多張MySQL數據表寫入同一張ClickHouse表的方法,歡迎大家批評指正。
首先來看看我們的需求背景:
-
實時同步多個MySQL實例數據到ClickHouse,每天規模500G,記錄數目億級別,可以接受分鍾級別的同步延遲;
-
某些數據庫表存在分庫分表的操作,用戶需要跨MySQL實例跨數據庫的表同步到ClickHouse的一張表中;
-
現有的MySQL binlog開源組件(Canal),無法做到多張源數據表到一張目的表的映射關系。
基本原理
一、使用JDBC方式同步
-
使用Canal組件完成binlog的解析和數據同步;
-
Canal-Server進程會偽裝成MySQL的slave,使用MySQL的binlog同步協議完成數據同步;
-
Canal-Adapter進程負責從canal-server獲取解析后的binlog,並且通過jdbc接口寫入到ClickHouse;
優點:
- Canal組件原生支持;
缺點:
-
Canal-Adpater寫入時源表和目的表一一對應,靈活性不足;
-
需要維護兩個Canal組件進程;
二、Kafka+ClickHouse物化視圖方式同步
-
Canal-Server完成binlog的解析,並且將解析后的json寫入Kafka;
-
Canal-Server可以根據正則表達式過濾數據庫和表名,並且根據規則寫入Kafka的topic;
-
ClickHouse使用KafkaEngine和Materialized View完成消息消費,並寫入本地表;
優點:
-
Kafka支持水平擴展,可以根據數據規模調整partition數目;
-
Kafka引入后將寫入請求合並,防止ClickHouse生成大量的小文件,從而影響查詢性能;
-
Canal-Server支持規則過濾,可以靈活配置上游的MySQL實例的數據庫名和表名,並且指明寫入的Kafka topic名稱;
缺點:
-
需要維護Kafka和配置規則;
-
ClickHouse需要新建相關的視圖、Kafka Engine的外表等;
具體步驟
一、准備工作
- 如果使用TencentDB,則在控制台確認binlog_format為ROW,無需多余操作。
如果是自建MySQL,則在客戶端中查詢變量:
> show variables like '%binlog%';
+-----------------------------------------+----------------------+
| Variable_name | Value |
+-----------------------------------------+----------------------+
| binlog_format | ROW |
+-----------------------------------------+----------------------+
> show variables like '%log_bin%';
+---------------------------------+--------------------------------------------+
| Variable_name | Value |
+---------------------------------+--------------------------------------------+
| log_bin | ON |
| log_bin_basename | /data/mysql_root/log/20146/mysql-bin |
| log_bin_index | /data/mysql_root/log/20146/mysql-bin.index |
+---------------------------------+--------------------------------------------+
- 創建賬號canal,用於同步binlog
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'%';
FLUSH PRIVILEGES;
二、Canal組件部署
前置條件:
Canal組件部署的機器需要跟ClickHouse服務和MySQL網絡互通;
需要在機器上部署java8,配置JAVA_HOME、PATH等環境變量;
基本概念:
1. Canal-Server組件部署
Canal-Server的主要作用是訂閱binlog信息並解析和定義instance相關信息,建議每個Canal-Server進程對應一個MySQL實例;
1)下載canal.deployer-1.1.4.tar.gz,解壓
2)修改配置文件conf/canal.properties,需要關注的配置如下:
...
# 端口相關信息,如果同一台機器部署多個進程需要修改
canal.port = 11111
canal.metrics.pull.port = 11112
canal.admin.port = 11110
...
# 服務模式
canal.serverMode = tcp
...
# Kafka地址
canal.mq.servers = 172.21.48.11:9092
# 使用消息隊列時 這兩個值必須為true
canal.mq.flatMessage = true
canal.mq.flatMessage.onlyData = true
...
# instance列表,conf目錄下必須有同名的目錄
canal.destinations = example,example2
3)配置instance
可以參照example新增新的instance,主要修改配置文件conf/${instance_name}/instance.properties文件。
樣例1: 同步某個數據庫的以XX前綴開頭的表
訂閱 172.21.48.35的MySQL的testdb數據庫中的以tb_開頭的表的數據變更(例如tb_20200801 、 tb_20200802等),主要的步驟如下:
步驟1:創建example2實例:cddeployer/conf && cp -r example example2
步驟2:修改deployer/conf/example2/instance.properties文件
...
# 上游MySQL實例地址
canal.instance.master.address=172.21.48.35:3306
...
# 同步賬戶信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
# 過濾數據庫名稱和表名
canal.instance.filter.regex=testdb\\.tb_.*,
步驟3:在conf/canal.properties中修改 canal.destinations ,新增example2
樣例2: 同步多個數據庫的以XX前綴開頭的表,且輸出到Kafka
訂閱 172.21.48.35的MySQL的empdb_0數據庫的employees_20200801表,empdb_1數據庫的employees_20200802表,並且數據寫入Kafka;
步驟1:創建example2實例:cddeployer/conf && cp -r example example3
步驟2:修改deployer/conf/example3/instance.properties文件
...
# 上游MySQL實例地址
canal.instance.master.address=172.21.48.35:3306
...
# 同步賬戶信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
# 過濾數據庫名稱和表名
canal.instance.filter.regex=empdb_.*\\.employees_.*
...
# Kafka的topic名稱和匹配的規則
canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*
canal.mq.partition=0
# Kafka topic的分區數目(即partition數目)
canal.mq.partitionsNum=3
# 根據employees_開頭的表中的 emp_no字段來進行數據hash,分布到不同的partition
canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no
步驟3:在Kafka中新建topic employees_topic,指定分區數目為3
步驟4:在conf/canal.properties中修改 canal.destinations ,新增example3;修改服務模式為kafka,配置kafka相關信息;
# 服務模式
canal.serverMode = kafka
...
# Kafka地址
canal.mq.servers = 172.21.48.11:9092
# 使用消息隊列時 這兩個值必須為true
canal.mq.flatMessage = true
canal.mq.flatMessage.onlyData = true
...
# instance列表,conf目錄下必須有同名的目錄
canal.destinations = example,example2,example3
2. Canal-Adapter組件部署(只針對方案一)
Canal-Adapter的主要作用是通過JDBC接口寫入ClickHouse數據,可以配置多個表的寫入;
1)下載canal.adapter-1.1.4.tar.gz,解壓;
2)在lib目錄下新增clickhouse驅動jar包及httpclient的jar包 httpcore-4.4.13.jar、httpclient-4.3.3.jar、clickhouse-jdbc-0.2.4.jar;
3)修改配置文件conf/application.yml文件,修改canalServerHost、srcDataSources、canalAdapters的配置;
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH🇲🇲ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp
canalServerHost: 127.0.0.1:11111 # canal-server的服務地址
batchSize: 500
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
# MySQL的配置,修改用戶名密碼及制定數據庫
srcDataSources:
defaultDS:
url: jdbc:mysql://172.21.48.35:3306
username: root
password: yourpasswordhere
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
- name: logger
- name: rdb
key: mysql1
# clickhouse的配置,修改用戶名密碼數據庫
properties:
jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver
jdbc.url: jdbc:clickhouse://172.21.48.18:8123
jdbc.username: default
jdbc.password:
4)修改配置文件conf/rdb/mytest_user.yml文件
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: testdb
mirrorDb: true
上述的配置文件中,由於開啟了mirrorDb: true,目的端的ClickHouse必須有相同的數據庫名和表名。
樣例1:源數據庫與目標數據庫名字不同,源表名與目標表名不同
修改adapter的conf/rdb/mytest_user.yml配置文件,指定源數據庫和目標數據庫
dataSourceKey: defaultDS
destination: example
groupId: g1
outerAdapterKey: mysql1
concurrent: true
dbMapping:
database: source_database_name
table: source_table
targetTable: destination_database_name.destination_table
targetColumns:
id:
name:
commitBatch: 3000 # 批量提交的大小
樣例2:多個源數據庫表寫入目的端的同一張表
在conf/rdb 目錄配置多個yml文件,分別指明不同的table名稱。
Kafka 服務配置
一、調整合理的producer參數
確認Canal-Server里的canal.properties文件,重要參數見下表;
二、新建相關的topic名稱
根據Canal-Server里instance里配置文件instance.properties,注意分區數目與canal.mq.partitionsNum 保持一致;
partition數目需要考慮以下因素:
-
上游的MySQL的數據量。原則上數據寫入量越大,應該分配更多的partition數目;
-
考慮下游ClickHouse的實例數目。topic的partition分區總數 最好 不大於 下游ClickHouse的總實例數目,保證每個ClickHouse實例都能至少分配到一個partition;
ClickHouse服務配置
根據上游MySQL實例的表的schema新建數據表;
引入Kafka時需要額外新建Engine=Kafka的外表以及相關的物化視圖表;
建議:
-
為每個外表新增不同的 kafka_group_name,防止相互影響;
-
設置kafka_skip_broken_messages 參數為合理值,遇到無法解析數據會跳過;
-
設置合理的kafka_num_consumers值,最好保證所有ClickHouse實例該值的總和大於 topic的partition數目;
新建相關的分布式查詢表;
服務啟動
啟動相關的Canal組件進程;
-
canal-server: sh bin/startup.sh
-
canal-adapter: sh bin/startup.sh
在MySQL中插入數據,觀察日志是否可以正常運行;
如果使用Kafka,可以通過kafka-console-consumer.sh腳本觀察binlog數據解析;
觀察ClickHouse數據表中是否正常寫入數據;
實際案例
需求:實時同步MySQL實例的empdb_0.employees_20200801表和empdb_1.employees_20200802數據表
方案:使用方案二
環境及參數:
MySQL地址 | 172.21.48.35:3306 |
---|---|
CKafka地址 | 172.21.48.11:9092 |
Canal instance名稱 | employees |
Kafka目的topic | employees_topic |
1.在MySQL新建相關表
# MySQL表的建表語句
CREATE DATABASE `empdb_0`;
CREATE DATABASE `empdb_1`;
CREATE TABLE `empdb_0`.`employees_20200801` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(14) NOT NULL,
`last_name` varchar(16) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
);
CREATE TABLE `empdb_1`.`employees_20200802` (
`emp_no` int(11) NOT NULL,
`birth_date` date NOT NULL,
`first_name` varchar(14) NOT NULL,
`last_name` varchar(16) NOT NULL,
`gender` enum('M','F') NOT NULL,
`hire_date` date NOT NULL,
PRIMARY KEY (`emp_no`)
);
2. Canal-Server配置
步驟1. 修改conf/canal.properties文件
canal.serverMode = kafka
...
canal.destinations = example,employees
...
canal.mq.servers = 172.21.48.11:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.flatMessage.onlyData = true
canal.mq.compressionType = none
canal.mq.acks = all
canal.mq.producerGroup = cdbproducer
canal.mq.accessChannel = local
...
步驟2. 新增employees實例,修改employees/instances.properties配置
...
canal.instance.master.address=172.21.48.35:3306
...
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
...
canal.instance.filter.regex=empdb_.*\\.employees_.*
...
canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*
canal.mq.partition=0
canal.mq.partitionsNum=3
canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no
3. Kafka配置
4. 新增topic employees_topic,分區數為3
5. ClickHouse建表
CREATE DATABASE testckdb ON CLUSTER default_cluster;
CREATE TABLE IF NOT EXISTS testckdb.ck_employees ON CLUSTER default_cluster (
`emp_no` Int32,
`birth_date` String,
`first_name` String,
`last_name` String,
`gender` String,
`hire_date` String
) ENGINE=MergeTree() ORDER BY (emp_no)
SETTINGS index_granularity = 8192;
CREATE TABLE IF NOT EXISTS testckdb.ck_employees_stream ON CLUSTER default_cluster (
`emp_no` Int32,
`birth_date` String,
`first_name` String,
`last_name` String,
`gender` String,
`hire_date` String
) ENGINE = Kafka()
SETTINGS
kafka_broker_list = '172.21.48.11:9092',
kafka_topic_list = 'employees_topic',
kafka_group_name = 'employees_group',
kafka_format = 'JSONEachRow',
kafka_skip_broken_messages = 1024,
kafka_num_consumers = 1;
CREATE MATERIALIZED VIEW IF NOT EXISTS testckdb.ck_employees_mv ON CLUSTER default_cluster TO testckdb.ck_employees(
`emp_no` Int32,
`birth_date` String,
`first_name` String,
`last_name` String,
`gender` String,
`hire_date` String
) AS SELECT
`emp_no`,
`birth_date`,
`first_name`,
`last_name`,
`gender`,
`hire_date`
FROM
testckdb.ck_employees_stream;
CREATE TABLE IF NOT EXISTS testckdb.ck_employees_dis ON CLUSTER default_cluster AS testckdb.ck_employees
ENGINE=Distributed(default_cluster, testckdb, ck_employees);
6. 啟動Canal-Server服務
MySQL實例上游插入數據,觀察數據是否在Canal-Server解析正常,是否在ClickHouse中完成同步。
本文由博客一文多發平台 OpenWrite 發布!