mysql數據庫變更監控(canal)


背景:

1. 一些項目的基礎功能會有Audit Trace, 以記錄系統用戶所做過的所有記錄。

2. 實時備份數據,比如mysql主從復制,一個用於面向應用,一個用於對應用數據庫的實時備份。

3. 實時收集關系型數據庫變更,將數據保存在nosql數據庫中,以提供快速檢索,一個較為實用的場景就是實現地將mysql數據變更同步到elastic search 或者 mongo db。

下面,將介紹如何通過canal,將mysql 數據變更同步到elastic search 。

首先我們了解一下什么是canal?

mysql主備復制實現


從上層來看,復制分成三步:

  1. master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
  2. slave將master的binary log events拷貝到它的中繼日志(relay log);
  3. slave重做中繼日志中的事件,將改變反映它自己的數據。

canal的工作原理:

原理相對比較簡單:

    1. canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
    2. mysql master收到dump請求,開始推送binary log給slave(也就是canal)
    3. canal解析binary log對象(原始為byte流)

 

 

安裝步驟:

 

訪問:https://github.com/alibaba/canal/releases ,會列出所有歷史的發布版本包 下載方式,比如以1.0.17版本為例子:

wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz

mkdir /tmp/canal
tar zxvf canal.deployer-1.0.17.tar.gz  -C /tmp/canal

 

配置修改

vi conf/example/instance.properties
#################################################
## mysql serverId
canal.instance.mysql.slaveId = 1234
#position info,需要改成自己的數據庫信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =


#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =


#username/password,需要改成自己的數據庫信息
canal.instance.dbUsername = canal

canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8


#table regex
canal.instance.filter.regex = .\..


#################################################

准備啟動

 sh bin/startup.sh

查看日志

vi logs/canal/canal.log

 

關閉

sh bin/stop.sh

 

下面試下在代碼中,獲取到mysql變更:

首先安裝下 canal 客戶端 nuget包

Install-Package CanalSharp.Client

 

static void Main(string[] args)
        {
            //canal 配置的 destination,默認為 example
            var destination = "example";
            //創建一個簡單 CanalClient 連接對象(此對象不支持集群)傳入參數分別為 canal 地址、端口、destination、用戶名、密碼
            var connector = CanalConnectors.NewSingleConnector("192.168.1.23", 11111, destination, "", "");
            //連接 Canal
            connector.Connect();
            //訂閱,同時傳入 Filter。Filter是一種過濾規則,通過該規則的表數據變更才會傳遞過來
            //允許所有數據 .*\\..*
            //允許某個庫數據 庫名\\..*
            //允許某些表 庫名.表名,庫名.表名
            connector.Subscribe(".*\\..*");
            while (true)
            {
                //獲取數據 1024表示數據大小 單位為字節
                var message = connector.Get(1024);
                //批次id 可用於回滾
                var batchId = message.Id;
                if (batchId == -1 || message.Entries.Count <= 0)
                {
                    Thread.Sleep(300);
                    continue;
                }

                PrintEntry(message.Entries);
            }
        }

        /// <summary>
        /// 輸出數據
        /// </summary>
        /// <param name="entrys">一個entry表示一個數據庫變更</param>
        private static void PrintEntry(List<Entry> entrys)
        {
            foreach (var entry in entrys)
            {
                if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
                {
                    continue;
                }

                RowChange rowChange = null;

                try
                {
                    //獲取行變更
                    rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
                }
                catch (Exception e)
                {
                    Console.WriteLine(e.Message);
                }

                if (rowChange != null)
                {

                    //by the changed entry's table name and record id. get the changed order(full info with any children records) form mysql and save it to es.

                    //to do it, boys !


                    //變更類型 insert/update/delete 等等
                    EventType eventType = rowChange.EventType;
                    //輸出binlog信息 表名 數據庫名 變更類型
                    Console.WriteLine(
                        $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");

                    //輸出 insert/update/delete 變更類型列數據
                    foreach (var rowData in rowChange.RowDatas)
                    {
                        if (eventType == EventType.Delete)
                        {
                            PrintColumn(rowData.BeforeColumns.ToList());
                        }
                        else if (eventType == EventType.Insert)
                        {
                            PrintColumn(rowData.AfterColumns.ToList());
                        }
                        else
                        {
                            Console.WriteLine("-------> before");
                            PrintColumn(rowData.BeforeColumns.ToList());
                            Console.WriteLine("-------> after");
                            PrintColumn(rowData.AfterColumns.ToList());
                        }
                    }
                }
            }
        }

        /// <summary>
        /// 輸出每個列的詳細數據
        /// </summary>
        /// <param name="columns"></param>
        private static void PrintColumn(List<Column> columns)
        {
            foreach (var column in columns)
            {
                //輸出列明 列值 是否變更
                Console.WriteLine($"{column.Name} : {column.Value}  update=  {column.Updated}");
            }
        }
    }

 

運行代碼,去到數據庫中改一下某行數據:

 

可以看到我們代碼收集到變更信息:

 

 

本篇就介紹到這里了, 至於如何將變更同步到es,那是屬於es操作的范疇,可參考 https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/elasticsearch-net.html

 

以上內容源於:

https://github.com/alibaba/canal/wiki/QuickStart

https://github.com/dotnetcore/CanalSharp

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM