開源基於Canal的開源增量數據訂閱&消費中間件


CanalSync

canal 是阿里巴巴開源的一款基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。

我開發的這個CanalSync項目 https://github.com/yuzd/CanalSync   ==>覺得不錯幫忙給個star謝謝

是基於canal-server之上的數據庫同步&消費中間件,

用於可快速搭建消費canal-server的項目。 目前我已實現並開源了如下:

  1. 數據消費傳輸到redis組件
  2. 數據消費傳輸到rabbitmq組件
  3. 數據消費傳輸到mysql數據庫組件

示意圖

image

Nuget:

1. 接收canal-server的消息中間件:

Install-Package Canal.Server

2. 解析canal-server消息轉出可執行sql的中間件:

Install-Package Canal.SqlParse

如何使用

如果你需要寫一個數據消費傳輸到XXXMQ,用不到反解析成sql的話,只需要引用 Canal.Server中間件。 如果你需要寫一個數據消費傳輸到XXXdb,得用到反解析sql中間件,需要同時引用Canal.Server 和 Canal.SqlParse 這2個中間件。

Canal.Server 如何使用

  1. 引用 Canal.Server 並appsettings.json 配置canal-server的參數.如下圖:

image

參數說明: image

  1. 創建一個 消費類 必須要 實現: INotificationHandler 接口,例如叫TestHandler
 public class TestHandler:INotificationHandler<CanalBody>{
        public Task Handle(CanalBody notification)
        {
            //寫消費邏輯

            return Task.CompletedTask;;
        }
    }
    
  1. 在startUp 使用並注冊 該消費類
      //注冊了之后 canal-server有新的消息就會進入到TestHandler的Handle方法
      services.AddCanalService(produce => produce.RegisterSingleton<TestHandler>());

Canal.SqlParse 如何使用

目前只實現了解析mysql的邏輯,未來會加入sqlserver的解析邏輯!!

      //注冊使用 connectionString是mysql的數據庫連接字符串
      services.AddMysqlParseService(connectionString);
      
      // 計划中還未實現
      //services.AddSqlserverParseService(connectionString);

在類的構造方法可注入:

image

如上圖,代表將canal-server的數據直接在另外的mysql庫里面執行,等於2個mysql數據進行互相同步。

歐洲與中國的2個mysql庫 使用上述方法進行同步的測試

結果: 同步速度在100~200qps 有點低,

下一步的優化方案是參考otter。消費端改用用aria2來下載

 image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM