一個實時收集MySql變更記錄的組件CanalSharp.AspNetCore


一、關於CanalSharp

  CanalSharp 是阿里巴巴開源項目 Canal 的 .NET 客戶端。為 .NET 開發者提供一個更友好的使用 Canal 的方式。Canal 是mysql數據庫binlog的增量訂閱&消費組件。

  CanalSharp 是 Canal 的 .NET 客戶端,它與 Canal 是采用的Socket來進行通信的,傳輸協議是TCP,交互協議采用的是 Google Protocol Buffer 3.0。

  CanalSharp的作者是園子里的曉晨WithLin

  更多關於CanalSharp的信息請瀏覽:https://github.com/CanalClient/CanalSharp

  更多關於Canal的信息請瀏覽:https://github.com/alibaba/canal

二、關於CanalSharp.AspNetCore

  CanalSharp.AspNetCore是一個基於CanalSharp的適用於ASP.NET Core的一個后台任務組件,它可以隨着ASP.NET Core實例的啟動而啟動,目前采用輪詢的方式對Canal Server進行監聽(我比較懶,目前主要是借鑒了曉晨的Demo來改寫的),獲得MySql行更改(RowChange)后寫入MySql指定的記錄表中(canal.logs,CanalSharp.AspNetCore會自動幫我們創建這張記錄表)。當然,這只是我目前的業務需求,完全可以改為事件訂閱+自定義輸出的方式進行完善,這是后話了。

  這個項目的GitHub地址為:https://github.com/XiLife-OSPC/CanalSharp.AspNetCore

三、使用前的准備工作

3.1 MySql

  當前的canal開源版本支持8.0及以下的版本,針對阿里雲RDS賬號默認已經有binlog dump權限,不需要任何權限或者binlog設置,可以直接跳過這一步。 開啟binlog寫入功能,並且配置binlog模式為row。

  修改C:\ProgramData\MySQL\MySQL Server 5.7\my.ini的以下內容

log-bin=mysql-bin
binlog-format=Row
server-id=1

  重啟數據庫服務,測試修改是否生效

show variables like 'binlog_format';
show variables like 'log_bin';

  創建一個用戶Canal用於獲取binlog的用戶並授予權限

CREATE USER canal IDENTIFIED BY canal; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal @'%';
FLUSH PRIVILEGES;

3.2 Canal-Server

  本文通過Docker方式來啟動Canal-Server,也可以不通過Docker方式來,更多內容可以參考Canal的github。

  通過Docker拉取Canal鏡像:

docker pull canal/canal-server:v1.1.2

  通過以下命令啟動Canal實例:

docker run --restart=always --name core_productservice_canal \
-e canal.instance.master.address=192.168.16.150:3306 \
-e canal.instance.dbUsername=canal \
-e canal.instance.dbPassword=canal \
-e canal.destinations=products \
-e canal.instance.defaultDatabaseName=products_dev \
-e canal.instance.filter.regex=products_dev\\..* \
-e canal.instance.filter.black.regex=products_dev\\.canal.* \
-p 8001:11111 \
-d canal/canal-server:v1.1.2

PS:其中name、destinations、defaultDatabaseName、filter根據要監聽的業務數據庫按需修改。

四、開始使用CanalSharp.AspNetCore

4.1 安裝Nuget包

  通過NuGet或項目引用添加該組件,搜索CanalSharp.AspNetCore

  目前最新版本為0.0.3,支持.NET Core 2.1及以上,暫未弄成.NET Standard類庫。

4.2 添加配置文件項

  在配置文件(appSettings.json)中添加以下配置項:

"Canal": {
    "Enabled": true,
    "LogSource": "Core.Product.Canal",
    "ServerIP": "192.168.16.190", // Canal-Server所在的服務器IP
    "ServerPort": 8001, // Canal-Server所在的服務器Port
    "Destination": "products", // 建議與Canal-Server中配置的destination保持一致
    "Filter": "products_dev\\..*", // 建議與Canal-Server中配置的filter保持一致
    "SleepTime": 50, // SleepTime越短監聽頻率越高但也越耗CPU
    "BufferSize": 2048, // 每次監聽獲取的數據量大小,單位為字節
    "Output": {
      "ConnStr": "Server=192.168.16.150;Port=3306;Database=products_dev;Uid=dev;Pwd=xdp" // 要輸出的日志記錄表所在的數據連接字符串
    }
  }

4.3 在Startup類中注冊

  在StartUp類中的Configure方法中加入以下代碼行:

public void Configure(IApplicationBuilder app, IHostingEnvironment env,
            IApplicationLifetime appLifetime, ILogger<ICanalClientHandler> defaultLogger)
{
    ......
    app.RegisterCanalSharpClient(appLifetime, Configuration, defaultLogger);
}

五、效果演示

  當在指定要監聽的數據庫對某張表的某行數據進行Update或Delete操作后,又或者進行Insert行操作后,canal.logs表會自動記錄變更的記錄數據如下圖:

 

PS:INSERT操作會記錄新增的數據行數據到CurrentValue列,DELETE操作會記錄刪除的數據行數據到PreviousValue列,UPDATE操作則會記錄修改前PreviousValue和修改后的值CurrentValue。對於INSERT和DELETE會生成JSON格式的字符串,例如“{"Id":"12312","Name":"精裝主材","Description":"測試描述",....}”這種。

六、示例項目

點這里:CanalSharp.AspNetCore.Sample

Code有點亂,我還沒來得及做重構和優化,先就這樣吧。

參考資料

李志強,《CanalSharp-mysql數據庫binlog的增量訂閱&消費組件Canal的.NET客戶端

CanalSharp GitHub

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM