一.前言
CanalSharp是阿里巴巴開源項目mysql數據庫binlog的增量訂閱&消費組件 Canal 的.NET客戶端,關於什么是 Canal?又能做什么?我會在后文為大家一一介紹。CanalSharp 這個項目,是由我和 WithLin (主要貢獻) 完成,並將一直進行維護的Canal的.NET客戶端項目。目前開源在github:https://github.com/CanalSharp/CanalSharp/ 希望大家多多支持,旨在為.NET開發者提供一個友好的對接Canal的選擇,為.NET社區生態做貢獻。
二.Canal介紹
1.背景
早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。
ps. 目前內部版本已經支持mysql和oracle部分版本的日志解析,當前的canal開源版本支持5.7及以下的版本(阿里內部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)
基於日志增量訂閱&消費支持的業務:
- 數據庫鏡像
- 數據庫實時備份
- 多級索引 (賣家和買家各自分庫索引)
- search build
- 業務cache刷新
- 價格變化等重要業務消息
2.工作原理
2.1 mysql主備復制實現
從上層來看,復制分成三步:
- master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
- slave將master的binary log events拷貝到它的中繼日志(relay log);
- slave重做中繼日志中的事件,將改變反映它自己的數據。
2.2 Canal的工作原理
原理相對比較簡單:
- canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
- mysql master收到dump請求,開始推送binary log給slave(也就是canal)
- canal解析binary log對象(原始為byte流)
以上內容摘自Canal項目官方資料 https://github.com/alibaba/canal
3.Canal的安裝以及使用
Canal的安裝以及使用請查閱官方文檔,本文不在贅述。 https://github.com/alibaba/canal/wiki
三.CanalSharp介紹
1.工作原理
CanalSharp 是 Canal 的 .NET 客戶端,它與 Canal 是采用的Socket來進行通信的,傳輸協議是TCP,交互協議采用的是 Google Protocol Buffer 3.0。
2.工作流程
1.Canal連接到mysql數據庫,模擬slave
2.CanalSharp與Canal建立連接
2.數據庫發生變更寫入到binlog
5.Canal向數據庫發送dump請求,獲取binlog並解析
4.CanalSharp向Canal請求數據庫變更
4.Canal發送解析后的數據給CanalSharp
5.CanalSharp收到數據,消費成功,發送回執。(可選)
6.Canal記錄消費位置。
以一張圖來表示:
3.應用場景
CanalSharp作為Canal的客戶端,其應用場景就是Canal的應用場景。關於應用場景在Canal介紹一節已有概述。這里我舉一些實際的使用例子:
1.代替使用輪詢數據庫方式來監控數據庫變更,有效改善輪詢耗費數據庫資源。
2.根據數據庫的變更實時更新搜索引擎,比如電商場景下商品信息發生變更,實時同步到商品搜索引擎 Elasticsearch、solr等
3.根據數據庫的變更實時更新緩存,比如電商場景下商品價格、庫存發生變更實時同步到redis
4.數據庫異地備份、數據同步
5.根據數據庫變更觸發某種業務,比如電商場景下,創建訂單超過xx時間未支付被自動取消,我們獲取到這條訂單數據的狀態變更即可向用戶推送消息。
6.將數據庫變更整理成自己的數據格式發送到kafka等消息隊列,供消息隊列的消費者進行消費。
四.CanalSharp的使用
1.使用前的准備
使用 CanalSharp 之前,必然要先准備好mysql數據庫以及Canal才行,這個步驟請直接查閱Canal官方文檔 https://github.com/alibaba/canal/wiki 。但是為了讓大家能快速跑通CanalSharp,CanalSharp 項目為大家提供了一個通過 docker-compose 同時運行 mysql和canal。
2.通過docker-compose運行mysql和canal:
git clone https://github.com/CanalSharp/CanalSharp.git
cd docker
docker-compose up -d
出現下圖表示運行成功:
3.使用navicat等數據庫管理工具連接mysql
ip:運行docker的服務器ip
mysql用戶:root
mysql密碼:000000
mysql端口:4406
默認提供了一個test數據庫,然后有一張名為test的表。
4.創建一個 .NET Core 控制台項目
5.添加 Nuget 程序包
Install-Package CanalSharp.Client
6.編碼
也可以直接下載源碼運行 Sample 項目 https://github.com/CanalSharp/CanalSharp/tree/master/sample/CanalSharp.SimpleClient
(1)建立連接
//canal 配置的 destination,默認為 example
var destination = "example";
//創建一個簡單CanalClient連接對象(此對象不支持集群)傳入參數分別為 canal地址、端口、destination、用戶名、密碼
var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", "");
//連接 Canal
connector.Connect();
//訂閱,同時傳入 Filter。Filter是一種過濾規則,通過該規則的表數據變更才會傳遞過來
//允許所有數據 .*\\..*
//允許某個庫數據 庫名\\..*
//允許某些表 庫名.表名,庫名.表名
connector.Subscribe(".*\\..*");
(2)獲取數據
while (true)
{
//獲取數據 1024表示數據大小 單位為字節
var message = connector.Get(1024);
//批次id 可用於回滾
var batchId = message.Id;
if (batchId == -1 || message.Entries.Count <= 0)
{
Thread.Sleep(300);
continue;
}
PrintEntry(message.Entries);
}
(3)輸出數據
/// <summary>
/// 輸出數據
/// </summary>
/// <param name="entrys">一個entry表示一個數據庫變更</param>
private static void PrintEntry(List<Entry> entrys)
{
foreach (var entry in entrys)
{
if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
{
continue;
}
RowChange rowChange = null;
try
{
//獲取行變更
rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
}
catch (Exception e)
{
}
if (rowChange != null)
{
//變更類型 insert/update/delete 等等
EventType eventType = rowChange.EventType;
//輸出binlog信息 表名 數據庫名 變更類型
Console.WriteLine(
$"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");
//輸出 insert/update/delete 變更類型列數據
foreach (var rowData in rowChange.RowDatas)
{
if (eventType == EventType.Delete)
{
PrintColumn(rowData.BeforeColumns.ToList());
}
else if (eventType == EventType.Insert)
{
PrintColumn(rowData.AfterColumns.ToList());
}
else
{
Console.WriteLine("-------> before");
PrintColumn(rowData.BeforeColumns.ToList());
Console.WriteLine("-------> after");
PrintColumn(rowData.AfterColumns.ToList());
}
}
}
}
}
/// <summary>
/// 輸出每個列的詳細數據
/// </summary>
/// <param name="columns"></param>
private static void PrintColumn(List<Column> columns)
{
foreach (var column in columns)
{
//輸出列明 列值 是否變更
Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}");
}
}
7.測試運行
首次運行會輸出一堆數據,那些都是初始化運行創建表的數據,忽略即可
運行項目,然后一次執行sql觀察輸出:
insert into test values(1000,'111');
update test set name='222' where id=1000;
delete from test where id=1000;
通過新標簽頁打開圖片查看大圖
可以看見我們分別執行 insert、update、delete 語句,我們的CanalSharp都獲取到了數據庫變更。
五.使用Canal的經驗
1.mysql數據庫版本有要求:5.7.13, 5.6.10,、5.5.18和5.1.40/48,不一定非要滿足小版本號的要求,比如 5.7.x、5.6.x、5.5.x都應該可以,但是實際需要自己做測試。前面的具體版本號是Canal官方提供的資料,但是博主公司用的mysql 的版本是5.5.60,是可以正常使用Canal的。
2.mysql數據binlog的格式強烈建議設置為row
3.Canal並非必須連接到master數據庫,它同樣可以連接到slave數據庫,只是從庫出了需要開啟寫入binlog以外還需要設置 log-slave-updates
開啟。
4.如果生產環境已經存在mysql集群,且集群主庫的binlog格式為mixed,mysql數據庫集群的主庫binlog格式可以不用改依然為 mixed,設置某一個從庫binlog格式配置為 row,讓Canal連接從庫,這樣可以避免對生產環境的mysql集群產生影響。
5.mysql支持Statement,MiXED,以及ROW三種格式的binlog為什么推薦使用row格式binlog,經過博主實際測試,使用row格式兼容性是最好的,實際可以自己測試。
六.結束語
CanalSharp的介紹到這里就結束了,如果覺得這個項目有用的歡迎大家來個 star 。后續將會寫幾篇文章介紹更詳細的使用方法以及實戰。
七.資料
CanalSharp 開源地址:https://github.com/CanalSharp/CanalSharp
Canal 開源地址:https://github.com/alibaba/canal