一、canal安裝(1.1.4)
-
下載安裝包,解壓,創建軟連接。
-
修改配置文件。
2.1 canal.properties# 可選項: tcp(默認), kafka, RocketMQ canal.serverMode = kafka # kafka/rocketmq 集群配置 canal.mq.servers = nn1.hadoop:9092,nn2.hadoop:9092,s1:9092 # 重試次數 canal.mq.retries = 0 # flagMessage模式下可以調大該值, 但不要超過MQ消息體大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下請將該值改大, 建議50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默認50K, 由於kafka最大消息體限制請勿超過1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get數據的超時時間, 單位: 毫秒, 空為不限超時 canal.mq.canalGetTimeout = 100 # 是否為flat json格式對象 canal.mq.flatMessage = false canal.mq.compressionType = none canal.mq.acks = all # kafka消息投遞是否使用事務 canal.mq.transaction = false
2.2 instance.properties
# 連接的數據庫地址 canal.instance.master.address=192.168.1.20:3306 # 用戶名,密碼 canal.instance.dbUsername = canal canal.instance.dbPassword = canal # 同步的庫 canal.instance.defaultDatabaseName = # 數據庫的編碼格式 canal.instance.connectionCharset = UTF-8 # 表的過濾規則 無論是CanalServer還是Consumer,只要有一方指定了filter都會生效,consumer端如果指定,則會覆蓋CanalServer端。 # 所有表:.* or .*\\..* # canal schema下所有表: canal\\..* # canal下的以canal打頭的表:canal\\.canal.* # canal schema下的一張表:canal.test1 # 多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔) canal.instance.filter.regex = .\*\\..\* ... # mq config canal.mq.topic=example # 針對庫名或者表名發送動態topic,1.1.3版本后開始支持,需要設置kafka自動創建topic # canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* # 單隊列模式的分區下標 canal.mq.partition=0 # 散列模式的分區數 # hash partition config #canal.mq.partitionsNum=3 # 散列規則定義 # 庫名.表名 : 唯一主鍵,比如mytest.person:id。多個表之間用逗號分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id
2.2.1 canal.mq.dynamicTopic 表達式說明:
canal 1.1.3版本之后, 支持配置格式:schema 或 schema.table,多個配置之間使用逗號或分號分隔:
. 例子1:test\.test 指定匹配的單表,發送到以test_test為名字的topic上
. 例子2:.\.. 匹配所有表,則每個表都會發送到各自表名的topic上
. 例子3:test 指定匹配對應的庫,一個庫的所有表都會發送到庫名的topic上
. 例子4:test\.* 指定匹配的表達式,針對匹配的表會發送到各自表名的topic上
. 例子5:test,test1\.test1,指定多個表達式,會將test庫的表都發送到test的topic上,test1\.test1的表發送到對應的test1_test1 topic上,其余的表發送到默認的canal.mq.topic值
為滿足更大的靈活性,允許對匹配條件的規則指定發送的topic名字,配置格式:topicName:schema 或 topicName:schema.table:
. 例子1: test:test\.test 指定匹配的單表,發送到以test為名字的topic上
. 例子2: test:.\.. 匹配所有表,因為有指定topic,則每個表都會發送到test的topic下
. 例子3: test:test 指定匹配對應的庫,一個庫的所有表都會發送到test的topic下
. 例子4:testA:test\.* 指定匹配的表達式,針對匹配的表會發送到testA的topic下
. 例子5:test0:test,test1:test1\.test1,指定多個表達式,會將test庫的表都發送到test0的topic下,test1\.test1的表發送到對應的test1的topic下,其余的表發送到默認的canal.mq.topic值2.2.2 canal.mq.partitionHash 表達式說明
canal 1.1.3版本之后, 支持配置格式:schema.table:pk1^pk2,多個配置之間使用逗號分隔:
. 例子1:test\.test:pk1^pk2 指定匹配的單表,對應的hash字段為pk1 + pk2
. 例子2:.\..:id 正則匹配,指定所有正則匹配的表對應的hash字段為id
. 例子3:.\..:\(pk\) 正則匹配,指定所有正則匹配的表對應的hash字段為表主鍵(自動查找)
. 例子4: 匹配規則啥都不寫,則默認發到0這個partition上
. 例子5:.\.. ,不指定pk信息的正則匹配,將所有正則匹配的表,對應的hash字段為表名。按表hash: 一張表的所有數據可以發到同一個分區,不同表之間會做散列 (會有熱點表分區過大問題)
. 例子6: test\.test:id,.\..* , 針對test的表按照id散列,其余的表按照table散列
. 注意:設置匹配規則,多條匹配規則之間是按照順序進行匹配(命中一條規則就返回) -
MySQL配置
3.1 開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下:[mysqld] log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
3.2 授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
二、kafka_2.11-2.1.1 安裝
三、測試
- 創建 MySQL 測試表
CREATE TABLE `order` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`order_id` varchar(64) NOT NULL COMMENT '訂單ID',
`amount` decimal(10,2) NOT NULL DEFAULT '0.00' COMMENT '訂單金額',
`create_time` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
PRIMARY KEY (`id`),
UNIQUE KEY `id` (`id`),
UNIQUE KEY `uniq_order_id` (`order_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='訂單表';
- 執行一條 insert 語句
INSERT INTO `order` (order_id, amount) VALUES ('300', 300.0);
啟動一個 kafka 消費者,觀察結果如下:
{
"data":[
{
"id":"3",
"order_id":"300",
"amount":"300.0",
"create_time":"2020-05-13 16:54:19"
}
],
"database":"canal_test",
"es":1589360059000,
"id":3,
"isDdl":false,
"mysqlType":{
"id":"bigint(20)",
"order_id":"varchar(64)",
"amount":"decimal(10,2)",
"create_time":"datetime"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":-5,
"order_id":12,
"amount":3,
"create_time":93
},
"table":"order",
"ts":1589360060236,
"type":"INSERT"
}
- 執行一條 update 語句
UPDATE `order`
SET amount = 350.0
WHERE order_id = '300';
觀察結果如下:
{
"data":[
{
"id":"3",
"order_id":"300",
"amount":"350.0",
"create_time":"2020-05-13 16:54:19"
}
],
"database":"canal_test",
"es":1589361037000,
"id":4,
"isDdl":false,
"mysqlType":{
"id":"bigint(20)",
"order_id":"varchar(64)",
"amount":"decimal(10,2)",
"create_time":"datetime"
},
"old":[
{
"amount":"300.0"
}
],
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":-5,
"order_id":12,
"amount":3,
"create_time":93
},
"table":"order",
"ts":1589361037721,
"type":"UPDATE"
}
- 執行一條 delete 語句
DELETE FROM `order`
WHERE order_id = '300';
結果如下:
{
"data":[
{
"id":"3",
"order_id":"300",
"amount":"350.0",
"create_time":"2020-05-13 16:54:19"
}
],
"database":"canal_test",
"es":1589361229000,
"id":5,
"isDdl":false,
"mysqlType":{
"id":"bigint(20)",
"order_id":"varchar(64)",
"amount":"decimal(10,2)",
"create_time":"datetime"
},
"old":null,
"pkNames":[
"id"
],
"sql":"",
"sqlType":{
"id":-5,
"order_id":12,
"amount":3,
"create_time":93
},
"table":"order",
"ts":1589361230115,
"type":"DELETE"
}