工作原理
canal 譯意為水道,主要用途是基於 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費。
* MySQL主備復制原理
MySQL master 將數據變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看)
MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據
* canal 工作原理
canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議
MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
canal 解析 binary log 對象(原始為 byte 流)
mysql開啟binlog
1. 登錄mysql,使用 show variables like 'log_bin'; 命令查看是否已開啟binlog
2. 如果沒有開啟的話,則需要修改my.cnf文件指定binlog信息
# 當前數據庫唯一編號
server-id=12345
# 二進制日志存儲地址(mysql-bin是文件前綴)
log-bin=/var/lib/mysql/logs/mysql-bin
# binlog日志格式,mysql默認采用statement,canal建議使用ROW
binlog_format=ROW
# binlog過期清理時間
expire_logs_days=7
# binlog每個日志文件大小
max_binlog_size=100m
# binlog緩存大小
binlog_cache_size=4m
# 最大binlog緩存大小
max_binlog_cache_size=512m
這里指定了log-bin=/var/lib/mysql/logs目錄,logs是手動創建出來的,且要 chmod -R 777 logs 賦予權限,否則mysql啟動失敗
3. 修改完成之后,重啟mysqld的服務。
canal服務端安裝
1. 下載地址
https://github.com/alibaba/canal/releases/tag/canal-1.0.24
2. 解壓
mkdir canal && tar -zxvf canal.deployer-1.0.24.tar.gz -C canal
3. 修改pid (canal像一個從庫去讀取mysql數據,所以他們的id是不能沖突的,mysql的剛才改為12345了)
vim conf/canal.properties
4. 配置數據庫信息和監聽規則

#################################################
## mysql的id
canal.instance.mysql.slaveId = 12345
# position info
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
# username/password
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#所有庫的所有表
#canal.instance.filter.regex = .*\\..*
#meiye庫的system_user表
#canal.instance.filter.regex =meiye.system_user
#meiye 和 brm 的所有表
canal.instance.filter.regex =meiye.*,brm.*
# table black regex
canal.instance.filter.black.regex =
#################################################
~
代碼測試

1. 導入依賴 <dependency> <groupId>com.xpand</groupId> <artifactId>starter-canal</artifactId> <version>0.0.1-SNAPSHOT</version> </dependency> 2. 配置canal地址信息 canal: client: instances: example: host: 192.168.200.100 port: 11111 batchSize: 1000 3. 啟動類開啟canal注解 @EnableCanalClient //聲明當前的服務是canal的客戶端 4. 測試代碼 import com.alibaba.otter.canal.protocol.CanalEntry; import com.xpand.starter.canal.annotation.*; /** * mysql數據監聽 */ @CanalEventListener public class CanalDataEventListener { /** * 監聽新增事件 * @param eventType 操作事件類型(新增) * @param rowData 發生變化的一行數據 */ @InsertListenPoint public void onEventInsert(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ for (CanalEntry.Column column : rowData.getAfterColumnsList()){ System.out.println("列名:"+column.getName()+"----------變化的數據"+column.getValue()); } } /** * 監聽修改事件 * @param eventType 操作事件類型(修改) * @param rowData 發生變化的一行數據 */ @UpdateListenPoint public void onEventUpdate(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ for (CanalEntry.Column column : rowData.getBeforeColumnsList()){ System.out.println("修改前列名:"+column.getName()+"----------變化的數據"+column.getValue()); } for (CanalEntry.Column column : rowData.getAfterColumnsList()){ System.out.println("修改后列名:"+column.getName()+"----------變化的數據"+column.getValue()); } } /** * 監聽刪除事件 * @param eventType 操作事件類型(刪除) * @param rowData 發生變化的一行數據 */ @DeleteListenPoint public void onEventDelete(CanalEntry.EventType eventType, CanalEntry.RowData rowData){ for (CanalEntry.Column column : rowData.getBeforeColumnsList()){ System.out.println("刪除前的列名:"+column.getName()+"----------變化的數據"+column.getValue()); } } /** * 自定義監聽 */ @ListenPoint( eventType = {CanalEntry.EventType.INSERT, CanalEntry.EventType.DELETE, CanalEntry.EventType.UPDATE}, // 監聽的事件類型 schema = {"changgou_content"}, // 監聽的庫 table = {"tb_content"} // 指定監控的表 ) public void onListenPoint(CanalEntry.EventType eventType, CanalEntry.RowData rowData) { for (CanalEntry.Column column : rowData.getBeforeColumnsList()){ System.out.println("自定義操作前:列名:"+column.getName()+"----------變化的數據"+column.getValue()); } for (CanalEntry.Column column : rowData.getAfterColumnsList()){ System.out.println("自定義操作后:列名:"+column.getName()+"----------變化的數據"+column.getValue()); } } }
。