准備
-
對於自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下
[mysqld] log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
- 注意:針對阿里雲 RDS for MySQL , 默認打開了 binlog , 並且賬號默認具有 binlog dump 權限 , 不需要任何權限或者 binlog 設置,可以直接跳過這一步
-
授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
安裝canal.server
1 下載壓縮包
到官網地址(release)下載最新壓縮包,請下載 canal.deployer-latest
.tar.gz
2 將canal.deployer 復制到固定目錄並解壓
mkdir -p /usr/local/canal cp canal.deployer-1.1.1.tar.gz /usr/local/canal tar -zxvf canal.deployer-1.1.1.tar.gz
3 配置修改參數
a. 修改instance 配置文件,修改單個數據源的配置 vi conf/example/instance.properties
# 按需修改成自己的數據庫信息 ################################################# ...
#需要同步的mysql地址,端口 canal.instance.master.address=192.168.1.20:3306 # username/password,數據庫的用戶名和密碼 ... canal.instance.dbUsername = root canal.instance.dbPassword = 123456
#需要同步的具體表,也可以使用正則表達式監聽多張表或者庫
canal.instance.filter.regex=realtime.tpp_vehicle_through_points_history ... # mq config
# topic的名稱 canal.mq.topic=topic_start # 針對庫名或者表名發送動態topic #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 # 庫名.表名: 唯一主鍵,多個表之間用逗號分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################
b. 修改canal 配置文件,修改 Canal 全局設置 vi ./conf/canal.properties
# ...
# 可選項: tcp(默認), kafka, RocketMQ
# 更改模式,直接把數據扔進 Kafka
canal.serverMode = kafka
# ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
## Kafka 的地址
canal.mq.servers = 10.255.xx.xx:9092,10.255.xx.xx:9092,10.255.xx.xx:9092
canal.mq.retries = 0
# flagMessage模式下可以調大該值, 但不要超過MQ消息體大小上限
canal.mq.batchSize = 16384
canal.mq.maxRequestSize = 1048576
# flatMessage模式下請將該值改大, 建議50-200
canal.mq.lingerMs = 1
canal.mq.bufferMemory = 33554432
# Canal的batch size, 默認50K, 由於kafka最大消息體限制請勿超過1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get數據的超時時間, 單位: 毫秒, 空為不限超時
canal.mq.canalGetTimeout = 100
# 是否為flat json格式對象
#canal.mq.flatMessage = false
canal.mq.flatMessage = true # 使用文本格式(JSON)進行傳輸,否則 Kafka 里扔進去的是二進制數據,雖然不影響,但是看起來不方便
canal.mq.compressionType = none
canal.mq.acks = all
# kafka消息投遞是否使用事務
canal.mq.transaction = false
詳細參數配置請看:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart
- 開啟 Canal
bin/startup.sh
- 查看日志是否有異常
vim logs/canal/canal.log
vim logs/test/test.log
- 測試,連上數據庫嘗試執行更改 SQL
- 啟動kafka的消費監控:kafka-console-consumer --bootstrap-server app02:9092 --from-beginning --topic topic_start
看得到同步的表名稱和同步的數據類型:UPDATE,INSERT,DELETE
- 如果出現一些意外導致數據錯誤,數據不能采集,刪除/opt/zp/canal/conf/example/h2.mv.db和meta.dat。重啟canal