CanalSync
canal 是阿里巴巴開源的一款基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。
我開發的這個CanalSync項目 https://github.com/yuzd/CanalSync ==>覺得不錯幫忙給個star謝謝
是基於canal-server之上的數據庫同步&消費中間件,
用於可快速搭建消費canal-server的項目。 目前我已實現並開源了如下:
- 數據消費傳輸到redis組件
- 數據消費傳輸到rabbitmq組件
- 數據消費傳輸到mysql數據庫組件
示意圖
Nuget:
1. 接收canal-server的消息中間件:
Install-Package Canal.Server
2. 解析canal-server消息轉出可執行sql的中間件:
Install-Package Canal.SqlParse
如何使用
如果你需要寫一個數據消費傳輸到XXXMQ,用不到反解析成sql的話,只需要引用 Canal.Server中間件。 如果你需要寫一個數據消費傳輸到XXXdb,得用到反解析sql中間件,需要同時引用Canal.Server 和 Canal.SqlParse 這2個中間件。
Canal.Server 如何使用
- 引用 Canal.Server 並appsettings.json 配置canal-server的參數.如下圖:
- 創建一個 消費類 必須要 實現: INotificationHandler 接口,例如叫TestHandler
public class TestHandler:INotificationHandler<CanalBody>{
public Task Handle(CanalBody notification)
{
//寫消費邏輯
return Task.CompletedTask;;
}
}
- 在startUp 使用並注冊 該消費類
//注冊了之后 canal-server有新的消息就會進入到TestHandler的Handle方法
services.AddCanalService(produce => produce.RegisterSingleton<TestHandler>());
Canal.SqlParse 如何使用
目前只實現了解析mysql的邏輯,未來會加入sqlserver的解析邏輯!!
//注冊使用 connectionString是mysql的數據庫連接字符串
services.AddMysqlParseService(connectionString);
// 計划中還未實現
//services.AddSqlserverParseService(connectionString);
在類的構造方法可注入:
如上圖,代表將canal-server的數據直接在另外的mysql庫里面執行,等於2個mysql數據進行互相同步。
歐洲與中國的2個mysql庫 使用上述方法進行同步的測試
結果: 同步速度在100~200qps 有點低,
下一步的優化方案是參考otter。消費端改用用aria2來下載