一、canal安裝與配置
1、電腦中首先需要下載安裝canal,可以去阿里的github上面下載(更多版本選擇),也可以從下面的地址進行提取
鏈接:https://pan.baidu.com/s/1oysSnGP-e_Zw6eEk9NT8rg
提取碼:j33k
canal支持多種語言使用。
2、下載完成,將它解壓,如:
3、 修改D:\Tools\alibaba\canal\conf\example下的 instance.properties(剛解壓出的example文件夾中,可以將除了instance.properties文件以外的文件全部刪除)
修改instance.properties文件的相關mysql數據庫配置
4、完成配置以后,進入bin文件夾,windows雙擊startup.bat文件(注意,canal需要java運行環境,如果電腦沒有java環境的,可以只配置一個jre)
如上圖顯示,canal就正常運行了。接下來就只需要開啟mysql的binlog日志,在自己的程序中使用canal即可
二、對 mysql的操作
1、查看binlog是否啟用,在mysql中執行 show variables like '%log_bin%';
2、電腦,右鍵管理,打開服務,找到mysql的my.ini配置文件位置,添加
server_id=1918
log_bin = mysql-bin
#binlog_format="ROW"
binlog_format="MIXED" #開啟MIXED模式
#binlog_format="STATEMENT"
3、重啟mysql服務就可以啟用binlog。開啟MIXED模式才可看見執行的語句
------三種模式詳細解釋:
互聯網公司使用MySQL的功能較少(不用存儲過程、觸發器、函數),選擇默認的Statement level
用到MySQL的特殊功能(存儲過程、觸發器、函數)則選擇Mixed模式
用到MySQL的特殊功能(存儲過程、觸發器、函數),又希望數據最大化一直則選擇Row模式
完成對mysql的配置,可以在mysql的安裝目錄下的data文件夾中看到000001文件,就是binlog日志文件。如:
三、C#代碼
阿里的github上面也有詳細的說明。
1、首先需要用到canal的包,去nuget下載CanalSharp.Client
2、在代碼中添加引用
using CanalSharp.Client.Impl;
//canal 配置的 destination,默認為 example var destination = "example"; //創建一個簡單CanalClient連接對象(此對象不支持集群)傳入參數分別為 canal地址、端口、destination、用戶名、密碼 var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", "");//這里可以就這樣 //連接 Canal connector.Connect(); //訂閱,同時傳入Filter,如果不傳則以Canal的Filter為准。Filter是一種過濾規則,通過該規則的表數據變更才會傳遞過來 //允許所有數據 .*\\..* //允許某個庫數據 庫名\\..* //允許某些表 庫名.表名,庫名.表名" connector.Subscribe(".*\\..*"); while (true) { //獲取數據 1024表示數據大小 單位為字節 var message = connector.Get(1024); //批次id 可用於回滾 var batchId = message.Id; if (batchId == -1 || message.Entries.Count <= 0) { Thread.Sleep(300); continue; } PrintEntrys(message.Entries); } /// <summary> /// 輸出數據 /// </summary> /// <param name="entrys">一個entry表示一個數據庫變更</param> private void PrintEntrys(List<Entry> entrys) { foreach (var entry in entrys) { if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend) { continue; } RowChange rowChange = null; try { //獲取行變更 rowChange = RowChange.Parser.ParseFrom(entry.StoreValue); } catch (Exception ex) { logMan.Error("mysql獲取行變更錯誤:" + ex.Message, ex); } if (rowChange != null) { //變更類型 insert/update/delete 等等 EventType eventType = rowChange.EventType; //輸出 insert/update/delete 變更類型列數據 foreach (var rowData in rowChange.RowDatas) { if (eventType == EventType.Delete) { PrintColumn(rowData.BeforeColumns.ToList(), entry.Header.TableName); } else if (eventType == EventType.Insert)//添加 { PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName); } else { if (entry.Header.TableName == "tablename1")//這里可以過濾自己想監測的表名 PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName, rowData.BeforeColumns.ToList()); } } } } } private void PrintColumn(List<Column> columns, string tbName, List<Column> oldColumns = null) { //修改的值,以及修改列的id值 string newNum = "", userid = ""; foreach (var column in columns) { if (column.Updated) { //輸出列 列值 是否變更 也可以拼出執行的語句 Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}"); } } }
附上一個完整的控制台應用。完成一、二中對mysql,canal的配置后,可以直接運行下列的控制台進行測試
using CanalSharp.Client.Impl; using Com.Alibaba.Otter.Canal.Protocol; using System; using System.Collections.Generic; using System.Linq; using System.Threading; namespace ConsoleApp1 { class Program { static void Main(string[] args) { //canal 配置的 destination,默認為 example var destination = "example"; //創建一個簡單 CanalClient 連接對象(此對象不支持集群)傳入參數分別為 canal 地址、端口、destination、用戶名、密碼 var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", ""); //連接 Canal connector.Connect(); //訂閱,同時傳入 Filter。Filter是一種過濾規則,通過該規則的表數據變更才會傳遞過來 //允許所有數據 .*\\..* //允許某個庫數據 庫名\\..* //允許某些表 庫名.表名,庫名.表名 connector.Subscribe(".*\\..*"); Console.WriteLine("監控創建成功,開始監控。"); while (true) { //獲取數據 1024表示數據大小 單位為字節 var message = connector.Get(1024); //批次id 可用於回滾 var batchId = message.Id; if (batchId == -1 || message.Entries.Count <= 0) { Thread.Sleep(300); continue; } PrintEntry(message.Entries); } } /// <summary> /// 輸出數據 /// </summary> /// <param name="entrys">一個entry表示一個數據庫變更</param> private static void PrintEntry(List<Entry> entrys) { Console.WriteLine("監控到改動,進入監控輸出。"); foreach (var entry in entrys) { if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend) { continue; } RowChange rowChange = null; try { //獲取行變更 rowChange = RowChange.Parser.ParseFrom(entry.StoreValue); Console.WriteLine("行變更。"); } catch (Exception ex) { //_logger.LogError(ex.ToString()); } if (rowChange != null) { //變更類型 insert/update/delete 等等 EventType eventType = rowChange.EventType; //輸出binlog信息 表名 數據庫名 變更類型 //_logger.LogInformation( // $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}"); Console.WriteLine("改變的數據庫名:" + entry.Header.SchemaName); //輸出 insert/update/delete 變更類型列數據 foreach (var rowData in rowChange.RowDatas) { if (eventType == EventType.Delete) { PrintColumn(rowData.BeforeColumns.ToList(), entry.Header.TableName); } else if (eventType == EventType.Insert) { PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName); } else { PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName); } } } } } /// <summary> /// 輸出每個列的詳細數據 /// </summary> /// <param name="columns"></param> private static void PrintColumn(List<Column> columns, string tbName) { Console.WriteLine("有改動"); foreach (var column in columns) { //if (column.Name == lookName) userid = column.Value; //Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}"); if (column.Updated) { //輸出列明 列值 是否變更 Console.WriteLine($"修改的表名:{tbName},列名:{column.Name} : {column.Value} update= {column.Updated}"); //修改以后的值 column.Name= //if (column.Name == alterName) newNum = column.Value; //if (column.Name == lookName) userid = column.Value; //Console.WriteLine($"update {tbName} set {alterName}={column.Value} where {lookName}={userid}"); } } } } }