一、canal 簡介
早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。
基於日志增量訂閱和消費的業務包括
- 數據庫鏡像
- 數據庫實時備份
- 索引構建和實時維護(拆分異構索引、倒排索引等)
- 業務 cache 刷新
- 帶業務邏輯的增量數據處理
當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
alibaba/canal: 阿里巴巴 MySQL binlog 增量訂閱&消費組件 (github.com)
二、下載安裝使用
下載鏈接地址 :Release v1.1.5 · alibaba/canal (github.com)
1.准備階段
-
對於自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下
[mysqld] log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復- 注意:針對阿里雲 RDS for MySQL , 默認打開了 binlog , 並且賬號默認具有 binlog dump 權限 , 不需要任何權限或者 binlog 設置,可以直接跳過這一步
-
授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
請務必查看基本操作官網地址說明:QuickStart · alibaba/canal Wiki (github.com)
2.選擇版本后下載相應的軟件

分別下載 adapter、depyer 、admin(按需下載)
創建文件夾canal-adapter 、canal-admin、canal-deployer 上傳到服務器后在相應文件夾解壓即可
1.修改canal-deployer 配置文件
$ cd /data/canal-deployer/conf/example
$ vim instance.properties
為什么會自帶有example 文件夾,請參考官網:AdminGuide · alibaba/canal Wiki (github.com)
修改數據庫鏈接地址找到配置信息
#數據庫地址
canal.instance.master.address=127.0.0.1:3306
#要同步的數據庫,可空,空的話就是同步全庫
#canal.instance.defaultDatabaseName =
canal.instance.defaultDatabaseName = shop
#上面初始化的用戶信息
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
保存退出即可,啟動命令在/bin文件夾下 執行
$ cd /data/canal-deployer
$ ./bin/startup.sh
關閉命令
$ $ ./bin/stop.sh
2.配置canal-admin
參考文檔 :Canal Admin QuickStart · alibaba/canal Wiki (github.com)
2.然后修改配置文件
$ cd data/canal-admin/conf
$ vim application.yml
server:
port: 8089
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
spring.datasource:
address:127.0.0.1:3306
database: canal_manager
username: canal
password: canal
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
hikari:
maximum-pool-size: 30
minimum-idle: 1
canal:
adminUser: admin
adminPasswd: admin
3.啟動命令
$ cd /data/canal-admin
$ ./bin/startup.sh
可以通過 http://127.0.0.1:8089/ 訪問,默認密碼:admin/123456
4.關閉命令
$ ./bin/stop.sh
5.配置 canal-deployer 連接admin,啟動命令 后面 加 local 即可
$ cd /data/canal-deployer
$ ./bin/startup.sh local
6.啟動 admin、然后啟動 canal-deployer
如果登錄admin 發現 沒有初始化實例和服務,點擊依次創建即可,載入模板-修改配置

3.canal-adapter 配置同步數據增量操作
1.修改配置文件指向自己想要的數據存儲連接,我這邊用的是es7版本。
$ cd /data/canal-adapter/conf
$ vim application.yml
server:
port: 8081
spring:
jackson:
date-format: yyyy-MM-dd HH:mm:ss
time-zone: GMT+8
default-property-inclusion: non_null
canal.conf:
mode: tcp #tcp kafka rocketMQ rabbitMQ
flatMessage: true
zookeeperHosts:
syncBatchSize: 1000
retries: 0
timeout:
accessKey:
secretKey:
consumerProperties:
# canal tcp consumer
canal.tcp.server.host: 127.0.0.1:11111
canal.tcp.zookeeper.hosts:
canal.tcp.batch.size: 500
canal.tcp.username:
canal.tcp.password:
# kafka consumer
kafka.bootstrap.servers: 127.0.0.1:9092
kafka.enable.auto.commit: false
kafka.auto.commit.interval.ms: 1000
kafka.auto.offset.reset: latest
kafka.request.timeout.ms: 40000
kafka.session.timeout.ms: 30000
kafka.isolation.level: read_committed
kafka.max.poll.records: 1000
# rocketMQ consumer
rocketmq.namespace:
rocketmq.namesrv.addr: 127.0.0.1:9876
rocketmq.batch.size: 1000
rocketmq.enable.message.trace: false
rocketmq.customized.trace.topic:
rocketmq.access.channel:
rocketmq.subscribe.filter:
# rabbitMQ consumer
rabbitmq.host:
rabbitmq.virtual.host:
rabbitmq.username:
rabbitmq.password:
rabbitmq.resource.ownerId:
srcDataSources:
defaultDS:
#數據庫地址
url: jdbc:mysql://127.0.0.1:3306/shop?useUnicode=true
username: root
password: root
canalAdapters:
- instance: example # canal instance Name or mq topic name
groups:
- groupId: g1
outerAdapters:
- name: logger
#es7 文件夾下的配置文件
- name: es7
#es 的地址
hosts: localhost:9200 #9300 是transport
properties:
# mode: transport # or rest
mode: rest
# # security.auth: test:123456 # only used for rest mode
cluster.name: elasticsearch
# - name: kudu
# key: kudu
# properties:
# kudu.master.address: 127.0.0.1 # ',' split multi address
2.進入es7文件夾下修改要同步的數據,然后進行配置
$ cd /data/canal-adapter/conf/es7
$ cat mytest_user.yml
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: mytest_user
_id: _id
# upsert: true
# pk: id
#修改成自己想要同步的sql 就可以了
sql: "select a.id as _id, a.name, a.role_id, b.role_name,
a.c_time from user a
left join role b on b.id=a.role_id"
# objFields:
# _labels: array:;
#我沒有這個操作所以注釋掉了
# etlCondition: "where a.c_time>={}"
#批量提交條數
commitBatch: 3000
4.啟動可能會出現數據源類轉換異常,需要下載源代碼然后解決沖突,找到escore項目修改數據源依賴范圍,重新打包找到 client-adapter.es7x-1.1.5-jar-with-dependencies.jar 上傳到plugin文件夾

$ cd /data/canal-adapter/plugin
$ 上傳jar包 刪除舊的 上傳新的

5.配置完成后啟動即可,因為是增量所以要修改數據庫數據,去數據庫更新要更新的語句。
4.去es查詢信息是否更新成功,上面的sql一定要配置成自己數據庫的sql,而且一定要提前創建索引!!!提前創建索引!!!提前創建索引!!!
6.如果是全量操作,編寫好yml文件然后調用接口,把下面的canal-adapter的地址和yml文件修改一下就可以了。借鑒博客:全量同步Elasticsearch方案之Canal - 知乎 (zhihu.com)
curl -X POST http://127.0.0.1:8081/etl/es7/sys_user.yml


https://github.com/alibaba/canal/tree/master/admin/admin-web/src/main/resources