一、關於CanalSharp
CanalSharp 是阿里巴巴開源項目 Canal 的 .NET 客戶端。為 .NET 開發者提供一個更友好的使用 Canal 的方式。Canal 是mysql數據庫binlog的增量訂閱&消費組件。
CanalSharp 是 Canal 的 .NET 客戶端,它與 Canal 是采用的Socket來進行通信的,傳輸協議是TCP,交互協議采用的是 Google Protocol Buffer 3.0。
更多關於CanalSharp的信息請瀏覽:https://github.com/CanalClient/CanalSharp
更多關於Canal的信息請瀏覽:https://github.com/alibaba/canal
二、關於CanalSharp.AspNetCore
CanalSharp.AspNetCore是一個基於CanalSharp的適用於ASP.NET Core的一個后台任務組件,它可以隨着ASP.NET Core實例的啟動而啟動,目前采用輪詢的方式對Canal Server進行監聽(我比較懶,目前主要是借鑒了曉晨的Demo來改寫的),獲得MySql行更改(RowChange)后寫入MySql指定的記錄表中(canal.logs,CanalSharp.AspNetCore會自動幫我們創建這張記錄表)。當然,這只是我目前的業務需求,完全可以改為事件訂閱+自定義輸出的方式進行完善,這是后話了。
這個項目的GitHub地址為:https://github.com/XiLife-OSPC/CanalSharp.AspNetCore
三、使用前的准備工作
3.1 MySql
當前的canal開源版本支持8.0及以下的版本,針對阿里雲RDS賬號默認已經有binlog dump權限,不需要任何權限或者binlog設置,可以直接跳過這一步。 開啟binlog寫入功能,並且配置binlog模式為row。
修改C:\ProgramData\MySQL\MySQL Server 5.7\my.ini的以下內容
log-bin=mysql-bin binlog-format=Row server-id=1
重啟數據庫服務,測試修改是否生效
show variables like 'binlog_format'; show variables like 'log_bin';
創建一個用戶Canal用於獲取binlog的用戶並授予權限
CREATE USER canal IDENTIFIED BY canal; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal @'%'; FLUSH PRIVILEGES;
3.2 Canal-Server
本文通過Docker方式來啟動Canal-Server,也可以不通過Docker方式來,更多內容可以參考Canal的github。
通過Docker拉取Canal鏡像:
docker pull canal/canal-server:v1.1.2
通過以下命令啟動Canal實例:
docker run --restart=always --name core_productservice_canal \ -e canal.instance.master.address=192.168.16.150:3306 \ -e canal.instance.dbUsername=canal \ -e canal.instance.dbPassword=canal \ -e canal.destinations=products \ -e canal.instance.defaultDatabaseName=products_dev \ -e canal.instance.filter.regex=products_dev\\..* \ -e canal.instance.filter.black.regex=products_dev\\.canal.* \ -p 8001:11111 \ -d canal/canal-server:v1.1.2
PS:其中name、destinations、defaultDatabaseName、filter根據要監聽的業務數據庫按需修改。
四、開始使用CanalSharp.AspNetCore
4.1 安裝Nuget包
通過NuGet或項目引用添加該組件,搜索CanalSharp.AspNetCore

目前最新版本為0.0.3,支持.NET Core 2.1及以上,暫未弄成.NET Standard類庫。
4.2 添加配置文件項
在配置文件(appSettings.json)中添加以下配置項:
"Canal": { "Enabled": true, "LogSource": "Core.Product.Canal", "ServerIP": "192.168.16.190", // Canal-Server所在的服務器IP "ServerPort": 8001, // Canal-Server所在的服務器Port "Destination": "products", // 建議與Canal-Server中配置的destination保持一致 "Filter": "products_dev\\..*", // 建議與Canal-Server中配置的filter保持一致 "SleepTime": 50, // SleepTime越短監聽頻率越高但也越耗CPU "BufferSize": 2048, // 每次監聽獲取的數據量大小,單位為字節 "Output": { "ConnStr": "Server=192.168.16.150;Port=3306;Database=products_dev;Uid=dev;Pwd=xdp" // 要輸出的日志記錄表所在的數據連接字符串 } }
4.3 在Startup類中注冊
在StartUp類中的Configure方法中加入以下代碼行:
public void Configure(IApplicationBuilder app, IHostingEnvironment env, IApplicationLifetime appLifetime, ILogger<ICanalClientHandler> defaultLogger) { ...... app.RegisterCanalSharpClient(appLifetime, Configuration, defaultLogger); }
五、效果演示
當在指定要監聽的數據庫對某張表的某行數據進行Update或Delete操作后,又或者進行Insert行操作后,canal.logs表會自動記錄變更的記錄數據如下圖:
PS:INSERT操作會記錄新增的數據行數據到CurrentValue列,DELETE操作會記錄刪除的數據行數據到PreviousValue列,UPDATE操作則會記錄修改前PreviousValue和修改后的值CurrentValue。對於INSERT和DELETE會生成JSON格式的字符串,例如“{"Id":"12312","Name":"精裝主材","Description":"測試描述",....}”這種。
六、示例項目
點這里:CanalSharp.AspNetCore.Sample
Code有點亂,我還沒來得及做重構和優化,先就這樣吧。
參考資料
李志強,《CanalSharp-mysql數據庫binlog的增量訂閱&消費組件Canal的.NET客戶端》
