C# 使用Canal監控Mysql


一、canal安裝與配置

1、電腦中首先需要下載安裝canal,可以去阿里的github上面下載(更多版本選擇),也可以從下面的地址進行提取

鏈接:https://pan.baidu.com/s/1oysSnGP-e_Zw6eEk9NT8rg
提取碼:j33k

canal支持多種語言使用。

 

 

 2、下載完成,將它解壓,如:

  

3、 修改D:\Tools\alibaba\canal\conf\example下的 instance.properties(剛解壓出的example文件夾中,可以將除了instance.properties文件以外的文件全部刪除)

 

修改instance.properties文件的相關mysql數據庫配置

 

4、完成配置以后,進入bin文件夾,windows雙擊startup.bat文件(注意,canal需要java運行環境,如果電腦沒有java環境的,可以只配置一個jre)

 

 

 

  如上圖顯示,canal就正常運行了。接下來就只需要開啟mysql的binlog日志,在自己的程序中使用canal即可

二、對 mysql的操作

1、查看binlog是否啟用,在mysql中執行 show variables like '%log_bin%';
2、電腦,右鍵管理,打開服務,找到mysql的my.ini配置文件位置,添加
server_id=1918
log_bin = mysql-bin
#binlog_format="ROW"
binlog_format="MIXED" #開啟MIXED模式
#binlog_format="STATEMENT"

3、重啟mysql服務就可以啟用binlog。開啟MIXED模式才可看見執行的語句

------三種模式詳細解釋:
互聯網公司使用MySQL的功能較少(不用存儲過程、觸發器、函數),選擇默認的Statement level

用到MySQL的特殊功能(存儲過程、觸發器、函數)則選擇Mixed模式

用到MySQL的特殊功能(存儲過程、觸發器、函數),又希望數據最大化一直則選擇Row模式

 完成對mysql的配置,可以在mysql的安裝目錄下的data文件夾中看到000001文件,就是binlog日志文件。如:

三、C#代碼

阿里的github上面也有詳細的說明。

1、首先需要用到canal的包,去nuget下載CanalSharp.Client

 

2、在代碼中添加引用

using CanalSharp.Client.Impl;

//canal 配置的 destination,默認為 example
var destination = "example";
//創建一個簡單CanalClient連接對象(此對象不支持集群)傳入參數分別為 canal地址、端口、destination、用戶名、密碼
var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", "");//這里可以就這樣
//連接 Canal
connector.Connect();
//訂閱,同時傳入Filter,如果不傳則以Canal的Filter為准。Filter是一種過濾規則,通過該規則的表數據變更才會傳遞過來
//允許所有數據 .*\\..*
//允許某個庫數據 庫名\\..*
//允許某些表 庫名.表名,庫名.表名"
connector.Subscribe(".*\\..*");

while (true)
{
    //獲取數據 1024表示數據大小 單位為字節
    var message = connector.Get(1024);
    //批次id 可用於回滾
    var batchId = message.Id;
    if (batchId == -1 || message.Entries.Count <= 0)
    {
        Thread.Sleep(300);
        continue;
    }
    PrintEntrys(message.Entries);
}
/// <summary>
/// 輸出數據
/// </summary>
/// <param name="entrys">一個entry表示一個數據庫變更</param>
private void PrintEntrys(List<Entry> entrys)
{
    foreach (var entry in entrys)
    {
        if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
        {
            continue;
        }

        RowChange rowChange = null;
        try
        {
            //獲取行變更
            rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
        }
        catch (Exception ex)
        {
            logMan.Error("mysql獲取行變更錯誤:" + ex.Message, ex);
        }

        if (rowChange != null)
        {
            //變更類型 insert/update/delete 等等
            EventType eventType = rowChange.EventType;                   

            //輸出 insert/update/delete 變更類型列數據
            foreach (var rowData in rowChange.RowDatas)
            {
                if (eventType == EventType.Delete)
                {
                    PrintColumn(rowData.BeforeColumns.ToList(), entry.Header.TableName);
                }
                else if (eventType == EventType.Insert)//添加
                {
                    PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName);
                }
                else
                {
                    if (entry.Header.TableName == "tablename1")//這里可以過濾自己想監測的表名
                        PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName, rowData.BeforeColumns.ToList());
                }
            }
        }
    }
}
private void PrintColumn(List<Column> columns, string tbName, List<Column> oldColumns = null)
{
    //修改的值,以及修改列的id值
    string newNum = "", userid = "";
    foreach (var column in columns)
    {    
        if (column.Updated)
        {
            //輸出列 列值 是否變更 也可以拼出執行的語句
            Console.WriteLine($"{column.Name} : {column.Value}  update=  {column.Updated}");             
        }
    }
}

 附上一個完整的控制台應用。完成一、二中對mysql,canal的配置后,可以直接運行下列的控制台進行測試

using CanalSharp.Client.Impl;
using Com.Alibaba.Otter.Canal.Protocol;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

namespace ConsoleApp1
{
    class Program
    {
        static void Main(string[] args)
        {
            //canal 配置的 destination,默認為 example
            var destination = "example";
            //創建一個簡單 CanalClient 連接對象(此對象不支持集群)傳入參數分別為 canal 地址、端口、destination、用戶名、密碼
            var connector = CanalConnectors.NewSingleConnector("127.0.0.1", 11111, destination, "", "");
            //連接 Canal
            connector.Connect();
            //訂閱,同時傳入 Filter。Filter是一種過濾規則,通過該規則的表數據變更才會傳遞過來
            //允許所有數據 .*\\..*
            //允許某個庫數據 庫名\\..*
            //允許某些表 庫名.表名,庫名.表名
            connector.Subscribe(".*\\..*");
            Console.WriteLine("監控創建成功,開始監控。");
            while (true)
            {
                //獲取數據 1024表示數據大小 單位為字節
                var message = connector.Get(1024);
                //批次id 可用於回滾
                var batchId = message.Id;
                if (batchId == -1 || message.Entries.Count <= 0)
                {
                    Thread.Sleep(300);
                    continue;
                }

                PrintEntry(message.Entries);
            }
        }
        /// <summary>
        /// 輸出數據
        /// </summary>
        /// <param name="entrys">一個entry表示一個數據庫變更</param>
        private static void PrintEntry(List<Entry> entrys)
        {
            Console.WriteLine("監控到改動,進入監控輸出。");
            foreach (var entry in entrys)
            {
                if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
                {
                    continue;
                }

                RowChange rowChange = null;

                try
                {
                    //獲取行變更
                    rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
                    Console.WriteLine("行變更。");
                }
                catch (Exception ex)
                {
                    //_logger.LogError(ex.ToString());
                }

                if (rowChange != null)
                {
                    //變更類型 insert/update/delete 等等
                    EventType eventType = rowChange.EventType;
                    //輸出binlog信息 表名 數據庫名 變更類型
                    //_logger.LogInformation(
                    //    $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}");

                    Console.WriteLine("改變的數據庫名:" + entry.Header.SchemaName);
                    //輸出 insert/update/delete 變更類型列數據
                    foreach (var rowData in rowChange.RowDatas)
                    {
                        if (eventType == EventType.Delete)
                        {
                            PrintColumn(rowData.BeforeColumns.ToList(), entry.Header.TableName);
                        }
                        else if (eventType == EventType.Insert)
                        {
                            PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName);
                        }
                        else
                        {
                            PrintColumn(rowData.AfterColumns.ToList(), entry.Header.TableName);
                        }
                    }
                }
            }
        }

        /// <summary>
        /// 輸出每個列的詳細數據
        /// </summary>
        /// <param name="columns"></param>
        private static void PrintColumn(List<Column> columns, string tbName)
        {
            Console.WriteLine("有改動");
            foreach (var column in columns)
            {
                //if (column.Name == lookName) userid = column.Value;
                //Console.WriteLine($"{column.Name} : {column.Value}  update=  {column.Updated}");
                if (column.Updated)
                {
                    //輸出列明 列值 是否變更
                    Console.WriteLine($"修改的表名:{tbName},列名:{column.Name} : {column.Value}  update=  {column.Updated}");
                    //修改以后的值 column.Name=
                    //if (column.Name == alterName) newNum = column.Value;
                    //if (column.Name == lookName) userid = column.Value;
                    //Console.WriteLine($"update {tbName} set {alterName}={column.Value} where {lookName}={userid}");
                }
            }
        }
    }
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM