背景:
1. 一些項目的基礎功能會有Audit Trace, 以記錄系統用戶所做過的所有記錄。
2. 實時備份數據,比如mysql主從復制,一個用於面向應用,一個用於對應用數據庫的實時備份。
3. 實時收集關系型數據庫變更,將數據保存在nosql數據庫中,以提供快速檢索,一個較為實用的場景就是實現地將mysql數據變更同步到elastic search 或者 mongo db。
下面,將介紹如何通過canal,將mysql 數據變更同步到elastic search 。
首先我們了解一下什么是canal?
mysql主備復制實現
- master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
- slave將master的binary log events拷貝到它的中繼日志(relay log);
- slave重做中繼日志中的事件,將改變反映它自己的數據。
canal的工作原理:
原理相對比較簡單:
- canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
- mysql master收到dump請求,開始推送binary log給slave(也就是canal)
- canal解析binary log對象(原始為byte流)
安裝步驟:
訪問:https://github.com/alibaba/canal/releases ,會列出所有歷史的發布版本包 下載方式,比如以1.0.17版本為例子:
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz mkdir /tmp/canal tar zxvf canal.deployer-1.0.17.tar.gz -C /tmp/canal
配置修改
vi conf/example/instance.properties
################################################# ## mysql serverId canal.instance.mysql.slaveId = 1234 #position info,需要改成自己的數據庫信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,需要改成自己的數據庫信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = .\.. #################################################
准備啟動
sh bin/startup.sh
查看日志
vi logs/canal/canal.log
關閉
sh bin/stop.sh
下面試下在代碼中,獲取到mysql變更:
首先安裝下 canal 客戶端 nuget包
Install-Package CanalSharp.Client
static void Main(string[] args) { //canal 配置的 destination,默認為 example var destination = "example"; //創建一個簡單 CanalClient 連接對象(此對象不支持集群)傳入參數分別為 canal 地址、端口、destination、用戶名、密碼 var connector = CanalConnectors.NewSingleConnector("192.168.1.23", 11111, destination, "", ""); //連接 Canal connector.Connect(); //訂閱,同時傳入 Filter。Filter是一種過濾規則,通過該規則的表數據變更才會傳遞過來 //允許所有數據 .*\\..* //允許某個庫數據 庫名\\..* //允許某些表 庫名.表名,庫名.表名 connector.Subscribe(".*\\..*"); while (true) { //獲取數據 1024表示數據大小 單位為字節 var message = connector.Get(1024); //批次id 可用於回滾 var batchId = message.Id; if (batchId == -1 || message.Entries.Count <= 0) { Thread.Sleep(300); continue; } PrintEntry(message.Entries); } } /// <summary> /// 輸出數據 /// </summary> /// <param name="entrys">一個entry表示一個數據庫變更</param> private static void PrintEntry(List<Entry> entrys) { foreach (var entry in entrys) { if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend) { continue; } RowChange rowChange = null; try { //獲取行變更 rowChange = RowChange.Parser.ParseFrom(entry.StoreValue); } catch (Exception e) { Console.WriteLine(e.Message); } if (rowChange != null) { //by the changed entry's table name and record id. get the changed order(full info with any children records) form mysql and save it to es. //to do it, boys ! //變更類型 insert/update/delete 等等 EventType eventType = rowChange.EventType; //輸出binlog信息 表名 數據庫名 變更類型 Console.WriteLine( $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}"); //輸出 insert/update/delete 變更類型列數據 foreach (var rowData in rowChange.RowDatas) { if (eventType == EventType.Delete) { PrintColumn(rowData.BeforeColumns.ToList()); } else if (eventType == EventType.Insert) { PrintColumn(rowData.AfterColumns.ToList()); } else { Console.WriteLine("-------> before"); PrintColumn(rowData.BeforeColumns.ToList()); Console.WriteLine("-------> after"); PrintColumn(rowData.AfterColumns.ToList()); } } } } } /// <summary> /// 輸出每個列的詳細數據 /// </summary> /// <param name="columns"></param> private static void PrintColumn(List<Column> columns) { foreach (var column in columns) { //輸出列明 列值 是否變更 Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}"); } } }
運行代碼,去到數據庫中改一下某行數據:

可以看到我們代碼收集到變更信息:

本篇就介紹到這里了, 至於如何將變更同步到es,那是屬於es操作的范疇,可參考 https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/elasticsearch-net.html
以上內容源於:
https://github.com/alibaba/canal/wiki/QuickStart
https://github.com/dotnetcore/CanalSharp


