1.前言
在我們系統開發過程中,根據業務場景很多數據庫數據並不會直接給用戶訪問的,需要同步保存到ElasticSearch、Redis等存儲應用當中(例如最常見的是搜索頁面的ElasticSearch數據)。而阿里開源的框架Canal就是做這方面的功能,它可以把數據庫(暫時只支持MySQL和Oracle部分版本)日志解析獲取增量變更同步到其他存儲應用去。
2.什么是Canal?
官網介紹,canal [kə'næl],譯意為水道/管道/溝渠,主要用途是基於MySQL數據庫增量日志解析,提供增量數據訂閱和消費。
從上述介紹我們可以簡單認為Canal就是一個簡單的增量數據同步工具。
2.1MySQL主備復制原理
根據官網介紹,MySQL主備復制原理如下:
●MySQL master(主庫)將數據變更(增刪改)寫入二進制日志(binary log,其中記錄叫做二進制日志事件binary log events,可以通過show binlog events進行查看)。
●MySQL slave(從庫)將master的binary log events 拷貝到它的中繼日志(relay log)。
●MySQL slave(從庫)重放relay log中事件,將數據變更反映它自己的數據。
2.2Canal工作原理
根據官網介紹,Canal工作原理如下圖所示:
●canal模擬MySQL slave的交互協議,偽裝自己為MySQL slave,向MySQL master發送dump協議。
●MySQL master收到dump請求,開始推送binary log給slave(即canal )。
●canal解析binary log對象(原始為byte流),再推送到MySQL、kafka、ElasticSearch等存儲應用當中。
3.Canal能做什么?
早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務trigger(觸發器)獲取增量變更。從2010年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。所以Canal就是在這個場景中誕生的,它主要作用就是解決基於日志增量訂閱和消費的業務,例如:
●數據庫鏡像。
●數據庫實時備份。
●索引構建和實時維護(拆分異構索引、倒排索引等)。
●業務緩存刷新。
●帶業務邏輯的增量數據處理(例如ElasticSearch、Redis數據同步)。
在我做過的項目中,cancal經常被用到如下場景:
●根據數據庫的數據變更實時更新搜索引擎數據,比如我司電商場景下物料數據發生變更(例如后台上傳更新物料信息、價格),實時同步到搜索引擎Elasticsearch上。
●根據數據庫的數據變更實時更新緩存,比如專門運營人員每次修改物料品牌信息同時都會同步到Redis上。
●根據數據庫的數據變更實時推送到消息隊列,比如為了豐富自身系統物料庫存,定時作業拉取第三方渠道物料庫存推送到RabbitMQ等消息隊列去消費入庫。
4.如何搭建Canal?
4.1首先得安裝個MySQL數據庫
如果已經安裝好MySQL數據庫的,這一步可以跳過,如果沒有安裝好,請自行安裝(也可以查看我之前寫過一篇MySQL安裝教程,不過個人建議最好還是在Docker上安裝,簡單方便快捷,如果自己手動安裝,不懂點運維基礎知識,坑太多了),具體安裝教程度娘一堆資料。當前的canal支持MySQL版本包括5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
我的MySQL版本是8.0.23,所以canal是支持的。不知道自己安裝是什么版本可以通過SELECT VERSION();命令查看。
4.2數據庫配置
從上述可知,因為canal是模擬MySQL slave的交互協議,偽裝自己為MySQL slave,向MySQL master發送dump協議獲取binary log內容對象的,所以需要MySql開啟binlog。
●修改mysql.cnf中的配置:
-- 編輯mysql.cnf文件 vim /etc/my.cnf; -- 在my.cnf上加入如下配置 [mysqld] log-bin=mysql-bin #開啟binlog binlog-format=ROW #選擇ROW模式 server_id=1 #配置MySQL replaction需要定義,不要和canal的slaveId重復 expire-logs-days=10 #binlog日志保留的天數,清除超過10天的日志,防止日志文件過大,導致磁盤空間不足
●授權canal鏈接MySQL賬號具有作為MySQL slave的權限, 如果已有賬戶可直接grant(我這邊是根據官網示例創建一個canal賬號來演示):
-- 先登錄MySQL mysql -u root -p -- 創建用戶,用戶名:canal,密碼:qwer1234 CREATE USER canal IDENTIFIED BY 'qwer1234'; -- 授予上的所有權限給canal用戶; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- 刷新權限; FLUSH PRIVILEGES;
●查看下MySql是否開啟binlog日志
是否開啟binlog日志:
SHOW VARIABLES LIKE 'log_bin';
查看binlog日志文件列表:
SHOW BINARY LOGS;
查看當前正在寫入的binlog文件:
SHOW MASTER STATUS;
4.3Canal配置
安裝運行Canal服務端,一定要記得先檢查當前Linux系統是否安裝了java8環境,如果沒有安裝啟動Canal時候會有如下提示:
[root@dengwu canal]# sh bin/startup.sh which: no java in (/data/mysql/bin:/data/mysql/lib:/usr/lib64/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin) Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH.
具體安裝步驟如下:
●先從Oracle官網下載JDK安裝包:
通過Xftp工具導入到預先創建/app/package安裝包目錄下,再在/usr目錄下創建java目錄並解壓:
mkdir /usr/java cd /app/package tar zxvf jdk-8u291-linux-x64.tar.gz -C /usr/java
然后配置java環境變量:
vim /etc/profile
用vim編輯器來編輯profile文件,輸入i在文件末尾添加以下內容:
export JAVA_HOME=/usr/java/jdk1.8.0_291 export JRE_HOME=${JAVA_HOME}/jre export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin export PATH=$PATH:${JAVA_PATH}
配置完java環境變量后,:wq保存退出,看看是否生效:
echo $PATH
如果沒有生效,讓其生效:
source /etc/profile
再瞄瞄java8是否安裝成功:
java -version
●然后下載canal, 訪問release頁面, 選擇需要的包下載, 如最新版本1.1.5為例:
可以使用wget工具下載:
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
或者手動下載,通過Xftp等工具拉入安裝包目錄(/app/package)中:
再創建canal安裝目錄解壓安裝包:
mkdir /app/canal cd /app/package tar zxvf canal.deployer-1.1.5.tar.gz -C /app/canal
然后修改配置:
cd /app/canal vi conf/example/instance.properties i :wq
●啟動canal:
cd /app/canal
sh bin/startup.sh
注:Windows使用startup.bat啟動
●查看canal進程是否啟動成功:
ps -ef | grep canal
●查看instance的日志:
vi logs/example/example.log
●關閉canal:
sh bin/stop.sh
●在數據庫中查看從庫信息:
SHOW SLAVE HOSTS;
查看下canal實例(example)配置是否成功。
●記得把canal端口加入防火牆策略去:
-- 允許通過防火牆 firewall-cmd --permanent --zone=public --add-port=11111/tcp -- 從防火牆里移除 firewall-cmd --permanent --zone=public --remove-port=11111/tcp -- 查看端口在防火牆狀態 firewall-cmd --permanent --zone=public --query-port=11111/tcp -- 重啟防火牆 firewall-cmd --reload
注:如果是買阿里雲服務器,要到阿里雲安全組添加允許通過策略。還有Canal Server的默認端口為:11111,若需要修改,可以去/canal/conf目錄下的canal.properties配置文件中進行修改。
5.Canal的.NET客戶端CanalSharp使用
5.1快速入門
●先安裝客戶端:
Install-Package CanalSharp
●初始化日志:
CanalSharp使用Microsoft.Extensions.Logging.Abstractions,因為目前主流日志組件,如:nlog、serilog等,全部支持此日志抽象接入,也就是說你可以通過安裝nlog、serilog對其的適配,來使用它們,無論是Console App或則是Web App。
var loggerFactory = LoggerFactory.Create(builder => { builder .AddFilter("Microsoft", LogLevel.Debug) .AddFilter("System", LogLevel.Information) .AddConsole(); }); var logger= loggerFactory.CreateLogger<SimpleCanalConnection>();
●創建連接:
var conn=new SimpleCanalConnection(new SimpleCanalOptions("127.0.0.1",11111,1234),logger); await conn.ConnectAsync();//連接到Canal Server await conn.SubscribeAsync();//訂閱
●獲取數據:
var msg = await conn.GetAsync(1024);
5.2進階使用
●解析數據
○Entry
上文conn.GetAsync()返回的是一個Entry集合,Entry對應binlog記錄,它可能是事務標記也有可能是行數據變化,通過Entry.EntryType來區分,一般事務的標記在業務消費處理時不需要處理。
示例:
var entries = await conn.GetAsync(1024); foreach (var entry in entries) { //不處理事務標記 if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend) { continue; } }
Entry.Header包含了一些binlog以及數據庫信息:
屬性 |
說明 |
Entry.Header.LogfileName |
binlog文件名 |
Entry.Header.LogfileOffset |
binlog偏移 |
Entry.Header.SchemaName |
mysql schema名稱 |
Entry.Header.TableName |
表名 |
○RowChange
一般在業務處理中,都會需要行數據的變更,將Entry轉換為RowChange對象。
示例:
RowChange rowChange = null; try { rowChange = RowChange.Parser.ParseFrom(entry.StoreValue); } catch (Exception e) { _logger.LogError(e); }
通過RowChange.EventType來Row是什么變化,Update、Delete和Insert對應sql中的update、delete和insert語句,通過RowChange.RowDatas屬性,來訪問RowChange對象中包含的行變化數據集合。示例,遍歷 RowChange 中的行數據:
foreach (var rowData in rowChange.RowDatas) { //刪除的數據 if (eventType == EventType.Delete) { PrintColumn(rowData.BeforeColumns.ToList()); } //插入的數據 else if (eventType == EventType.Insert) { PrintColumn(rowData.AfterColumns.ToList()); } //更新的數據 else { _logger.LogInformation("-------> before"); PrintColumn(rowData.BeforeColumns.ToList()); _logger.LogInformation("-------> after"); PrintColumn(rowData.AfterColumns.ToList()); } } private static void PrintColumn(List<Column> columns) { foreach (var column in columns) { Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}"); } }
○Column
Column如其名,代表數據庫中表的每一列的信息:
屬性名 |
說明 |
Column.Name |
列名 |
Column.Value |
列的值 |
Column.Updated |
列是否被更新 |
5.3應答機制
應答機制可以保證消費數據的准確性,Canal服務端會記錄Client消費的進度,需要客戶端發送ACK消息,服務端才會更新進度。類似於在消息隊列中的ACK機制,如RabbitMQ。
●自動應答
await conn.GetAsync(1024);//獲取數據並自動應答 GetAsync()會在獲取數據后,自動向Server發送ack消息。
●手動應答
var msg = await conn.GetWithoutAckAsync(1024);//獲取數據 await conn.AckAsync(msg.Id);//手動應答 await conn.RollbackAsync(msg.Id);//回滾
5.4高可用
這里的高可用分為兩類,客戶端集群和服務端集群。都是采用冷備模式,因為對於binlog數據消費來說,並行處理將會帶來數據順序錯亂的問題,當然你可以通過一些復雜的機制去實現,這里不做說明。集群部署需要Zookeeper組件。
●服務端集群
在conf/canal.properties文件中修改zookeeper地址:
canal.zkServers=127.0.0.1:2181
集群中每個實例需配置相同的zookeeper地址。
●客戶端集群
客戶端集群和服務端集群采用相同的模式,每個實例去搶占鎖,獲得了鎖那么這個實例就運行獲取數據,其他實例做冷備。若正在運行消費數據的實例由於網絡波動,導致和zookeeper失去連接,那么其他客戶端實例不會立即搶占,會等待60s后才執行搶占,給與這個實例恢復的機會。
客戶端集群使用的連接對象和快速入門中的不同:ClusterCanalConnection,但使用方法基本相同。
示例:
//初始化日志 var loggerFactory = LoggerFactory.Create(builder => { builder .AddFilter("Microsoft", LogLevel.Debug) .AddFilter("System", LogLevel.Information) .AddConsole(); }); var logger = loggerFactory.CreateLogger<Program>(); //設置zk地址和clientid,統一集群的client必須相同 var conn = new ClusterCanalConnection( new ClusterCanalOptions("localhost:2181", "12350"),loggerFactory); //連接到Server await conn.ConnectAsync(); //訂閱 await conn.SubscribeAsync(); await conn.RollbackAsync(0); while (true) { try { //獲取數據 var msg = await conn.GetAsync(1024); } catch (Exception e) { _logger.LogError(e,"Error."); //發生異常執行重連,此方法只有集群連接對象才有 await conn.ReConnectAsync(); } }
5.5訂閱
訂閱指過濾表(table)的規則,Canal客戶端發送給客戶端訂閱規則,那么服務端將會推送符合規則的表數據過來,采用正則匹配。
允許所有表:.\*\\\\..\*
6.小結
這里這是簡單介紹Canal工作原理,能做什么,還有.NET客戶端CanalSharp使用,其實Canal涉及知識點還是很多的,例如配置MQ模式、服務集群、Web管理界面部署,多實例等等。后面如果有時間,我還會繼續花時間去學習。
參考文獻:
CanalSharp文檔
CanalSharp