(1)Canal入門


1.前言

在我們系統開發過程中,根據業務場景很多數據庫數據並不會直接給用戶訪問的,需要同步保存到ElasticSearch、Redis等存儲應用當中(例如最常見的是搜索頁面的ElasticSearch數據)。而阿里開源的框架Canal就是做這方面的功能,它可以把數據庫(暫時只支持MySQL和Oracle部分版本)日志解析獲取增量變更同步到其他存儲應用去。

2.什么是Canal?

官網介紹,canal [kə'næl],譯意為水道/管道/溝渠,主要用途是基於MySQL數據庫增量日志解析,提供增量數據訂閱和消費。
從上述介紹我們可以簡單認為Canal就是一個簡單的增量數據同步工具。

2.1MySQL主備復制原理

根據官網介紹,MySQL主備復制原理如下:
 
●MySQL master(主庫)將數據變更(增刪改)寫入二進制日志(binary log,其中記錄叫做二進制日志事件binary log events,可以通過show binlog events進行查看)。
●MySQL slave(從庫)將master的binary log events 拷貝到它的中繼日志(relay log)。
●MySQL slave(從庫)重放relay log中事件,將數據變更反映它自己的數據。

2.2Canal工作原理

根據官網介紹,Canal工作原理如下圖所示:
 
●canal模擬MySQL slave的交互協議,偽裝自己為MySQL slave,向MySQL master發送dump協議。
●MySQL master收到dump請求,開始推送binary log給slave(即canal )。
●canal解析binary log對象(原始為byte流),再推送到MySQL、kafka、ElasticSearch等存儲應用當中。

3.Canal能做什么?

早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務trigger(觸發器)獲取增量變更。從2010年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。所以Canal就是在這個場景中誕生的,它主要作用就是解決基於日志增量訂閱和消費的業務,例如:
●數據庫鏡像。
●數據庫實時備份。
●索引構建和實時維護(拆分異構索引、倒排索引等)。
●業務緩存刷新。
●帶業務邏輯的增量數據處理(例如ElasticSearch、Redis數據同步)。
在我做過的項目中,cancal經常被用到如下場景:
●根據數據庫的數據變更實時更新搜索引擎數據,比如我司電商場景下物料數據發生變更(例如后台上傳更新物料信息、價格),實時同步到搜索引擎Elasticsearch上。
●根據數據庫的數據變更實時更新緩存,比如專門運營人員每次修改物料品牌信息同時都會同步到Redis上。
●根據數據庫的數據變更實時推送到消息隊列,比如為了豐富自身系統物料庫存,定時作業拉取第三方渠道物料庫存推送到RabbitMQ等消息隊列去消費入庫。

4.如何搭建Canal?

4.1首先得安裝個MySQL數據庫

如果已經安裝好MySQL數據庫的,這一步可以跳過,如果沒有安裝好,請自行安裝(也可以查看我之前寫過一篇MySQL安裝教程,不過個人建議最好還是在Docker上安裝,簡單方便快捷,如果自己手動安裝,不懂點運維基礎知識,坑太多了),具體安裝教程度娘一堆資料。當前的canal支持MySQL版本包括5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
我的MySQL版本是8.0.23,所以canal是支持的。不知道自己安裝是什么版本可以通過SELECT VERSION();命令查看。

4.2數據庫配置

從上述可知,因為canal是模擬MySQL slave的交互協議,偽裝自己為MySQL slave,向MySQL master發送dump協議獲取binary log內容對象的,所以需要MySql開啟binlog。
●修改mysql.cnf中的配置:

-- 編輯mysql.cnf文件
vim /etc/my.cnf;
-- 在my.cnf上加入如下配置
[mysqld]
log-bin=mysql-bin #開啟binlog
binlog-format=ROW #選擇ROW模式
server_id=1 #配置MySQL replaction需要定義,不要和canal的slaveId重復
expire-logs-days=10 #binlog日志保留的天數,清除超過10天的日志,防止日志文件過大,導致磁盤空間不足

●授權canal鏈接MySQL賬號具有作為MySQL slave的權限, 如果已有賬戶可直接grant(我這邊是根據官網示例創建一個canal賬號來演示):

-- 先登錄MySQL
mysql -u root -p
-- 創建用戶,用戶名:canal,密碼:qwer1234
CREATE USER canal IDENTIFIED BY 'qwer1234';   
-- 授予上的所有權限給canal用戶;
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- 刷新權限;
FLUSH PRIVILEGES;

●查看下MySql是否開啟binlog日志
是否開啟binlog日志:

SHOW VARIABLES LIKE 'log_bin';

 
查看binlog日志文件列表:

SHOW BINARY LOGS;

 
查看當前正在寫入的binlog文件:

SHOW MASTER STATUS;

 

4.3Canal配置

安裝運行Canal服務端,一定要記得先檢查當前Linux系統是否安裝了java8環境,如果沒有安裝啟動Canal時候會有如下提示:

[root@dengwu canal]# sh bin/startup.sh
which: no java in (/data/mysql/bin:/data/mysql/lib:/usr/lib64/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/root/bin)
Cannot find a Java JDK. Please set either set JAVA or put java (>=1.5) in your PATH.

 
具體安裝步驟如下:
●先從Oracle官網下載JDK安裝包:
 
通過Xftp工具導入到預先創建/app/package安裝包目錄下,再在/usr目錄下創建java目錄並解壓:

mkdir /usr/java
cd /app/package
tar zxvf jdk-8u291-linux-x64.tar.gz  -C /usr/java

然后配置java環境變量:

vim /etc/profile

用vim編輯器來編輯profile文件,輸入i在文件末尾添加以下內容:

export JAVA_HOME=/usr/java/jdk1.8.0_291
export JRE_HOME=${JAVA_HOME}/jre
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib:$CLASSPATH
export JAVA_PATH=${JAVA_HOME}/bin:${JRE_HOME}/bin
export PATH=$PATH:${JAVA_PATH}

 
配置完java環境變量后,:wq保存退出,看看是否生效:

echo $PATH

如果沒有生效,讓其生效:

source /etc/profile

再瞄瞄java8是否安裝成功:

java -version

 
●然后下載canal, 訪問release頁面, 選擇需要的包下載, 如最新版本1.1.5為例:
 
可以使用wget工具下載:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz

或者手動下載,通過Xftp等工具拉入安裝包目錄(/app/package)中:
 
再創建canal安裝目錄解壓安裝包:

mkdir /app/canal
cd /app/package
tar zxvf canal.deployer-1.1.5.tar.gz -C /app/canal

然后修改配置:

cd /app/canal
vi conf/example/instance.properties
i
:wq

 
●啟動canal:

cd /app/canal
sh bin/startup.sh

注:Windows使用startup.bat啟動
●查看canal進程是否啟動成功:

ps -ef | grep canal

●查看instance的日志:

vi logs/example/example.log

●關閉canal:

sh bin/stop.sh

●在數據庫中查看從庫信息:

SHOW SLAVE HOSTS;

 
查看下canal實例(example)配置是否成功。
●記得把canal端口加入防火牆策略去:

-- 允許通過防火牆
firewall-cmd --permanent --zone=public --add-port=11111/tcp
-- 從防火牆里移除
firewall-cmd --permanent --zone=public --remove-port=11111/tcp
-- 查看端口在防火牆狀態
firewall-cmd --permanent --zone=public --query-port=11111/tcp
-- 重啟防火牆
firewall-cmd --reload

注:如果是買阿里雲服務器,要到阿里雲安全組添加允許通過策略。還有Canal Server的默認端口為:11111,若需要修改,可以去/canal/conf目錄下的canal.properties配置文件中進行修改。

5.Canal的.NET客戶端CanalSharp使用

5.1快速入門

●先安裝客戶端:

Install-Package CanalSharp

●初始化日志:
CanalSharp使用Microsoft.Extensions.Logging.Abstractions,因為目前主流日志組件,如:nlog、serilog等,全部支持此日志抽象接入,也就是說你可以通過安裝nlog、serilog對其的適配,來使用它們,無論是Console App或則是Web App。

var loggerFactory = LoggerFactory.Create(builder =>
{
    builder
        .AddFilter("Microsoft", LogLevel.Debug)
        .AddFilter("System", LogLevel.Information)
        .AddConsole();
});
var logger= loggerFactory.CreateLogger<SimpleCanalConnection>();

●創建連接:

var conn=new SimpleCanalConnection(new SimpleCanalOptions("127.0.0.1",11111,1234),logger);
await conn.ConnectAsync();//連接到Canal Server
await conn.SubscribeAsync();//訂閱

●獲取數據:

var msg = await conn.GetAsync(1024);

5.2進階使用

●解析數據
○Entry
上文conn.GetAsync()返回的是一個Entry集合,Entry對應binlog記錄,它可能是事務標記也有可能是行數據變化,通過Entry.EntryType來區分,一般事務的標記在業務消費處理時不需要處理。
示例:

var entries = await conn.GetAsync(1024);
foreach (var entry in entries)
{
    //不處理事務標記
    if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend)
    {
        continue;
    }
}

Entry.Header包含了一些binlog以及數據庫信息:

屬性

說明

Entry.Header.LogfileName

binlog文件名

Entry.Header.LogfileOffset

binlog偏移

Entry.Header.SchemaName

mysql schema名稱

Entry.Header.TableName

表名

○RowChange
一般在業務處理中,都會需要行數據的變更,將Entry轉換為RowChange對象。
示例:

RowChange rowChange = null;
try
{
    rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
}
catch (Exception e)
{
    _logger.LogError(e);
}

通過RowChange.EventType來Row是什么變化,Update、Delete和Insert對應sql中的update、delete和insert語句,通過RowChange.RowDatas屬性,來訪問RowChange對象中包含的行變化數據集合。示例,遍歷 RowChange 中的行數據:

foreach (var rowData in rowChange.RowDatas)
{
    //刪除的數據
    if (eventType == EventType.Delete)
    {
        PrintColumn(rowData.BeforeColumns.ToList());
    }
    //插入的數據
    else if (eventType == EventType.Insert)
    {
        PrintColumn(rowData.AfterColumns.ToList());
    }
    //更新的數據
    else
    {
        _logger.LogInformation("-------> before");
        PrintColumn(rowData.BeforeColumns.ToList());
        _logger.LogInformation("-------> after");
        PrintColumn(rowData.AfterColumns.ToList());
    }
}

private static void PrintColumn(List<Column> columns)
{
    foreach (var column in columns)
    {
        Console.WriteLine($"{column.Name} : {column.Value}  update=  {column.Updated}");
    }
}

○Column
Column如其名,代表數據庫中表的每一列的信息:

屬性名

說明

Column.Name

列名

Column.Value

列的值

Column.Updated

列是否被更新

5.3應答機制

應答機制可以保證消費數據的准確性,Canal服務端會記錄Client消費的進度,需要客戶端發送ACK消息,服務端才會更新進度。類似於在消息隊列中的ACK機制,如RabbitMQ。

●自動應答

await conn.GetAsync(1024);//獲取數據並自動應答
GetAsync()會在獲取數據后,自動向Server發送ack消息。

●手動應答

var msg = await conn.GetWithoutAckAsync(1024);//獲取數據
await conn.AckAsync(msg.Id);//手動應答
await conn.RollbackAsync(msg.Id);//回滾

5.4高可用

這里的高可用分為兩類,客戶端集群和服務端集群。都是采用冷備模式,因為對於binlog數據消費來說,並行處理將會帶來數據順序錯亂的問題,當然你可以通過一些復雜的機制去實現,這里不做說明。集群部署需要Zookeeper組件。
●服務端集群
在conf/canal.properties文件中修改zookeeper地址:

canal.zkServers=127.0.0.1:2181

集群中每個實例需配置相同的zookeeper地址。
●客戶端集群
客戶端集群和服務端集群采用相同的模式,每個實例去搶占鎖,獲得了鎖那么這個實例就運行獲取數據,其他實例做冷備。若正在運行消費數據的實例由於網絡波動,導致和zookeeper失去連接,那么其他客戶端實例不會立即搶占,會等待60s后才執行搶占,給與這個實例恢復的機會。
客戶端集群使用的連接對象和快速入門中的不同:ClusterCanalConnection,但使用方法基本相同。
示例:

//初始化日志
var loggerFactory = LoggerFactory.Create(builder =>
            {
                builder
                    .AddFilter("Microsoft", LogLevel.Debug)
                    .AddFilter("System", LogLevel.Information)
                    .AddConsole();
            });
var logger = loggerFactory.CreateLogger<Program>();
//設置zk地址和clientid,統一集群的client必須相同
var conn = new ClusterCanalConnection( new ClusterCanalOptions("localhost:2181", "12350"),loggerFactory);
//連接到Server                                      
await conn.ConnectAsync();
//訂閱
await conn.SubscribeAsync();
await conn.RollbackAsync(0);
while (true)
{
    try
    {
        //獲取數據
        var msg = await conn.GetAsync(1024);
    }
    catch (Exception e)
    {
        _logger.LogError(e,"Error.");
        //發生異常執行重連,此方法只有集群連接對象才有
        await conn.ReConnectAsync();
    }
}

5.5訂閱

訂閱指過濾表(table)的規則,Canal客戶端發送給客戶端訂閱規則,那么服務端將會推送符合規則的表數據過來,采用正則匹配。
允許所有表:.\*\\\\..\*

6.小結

這里這是簡單介紹Canal工作原理,能做什么,還有.NET客戶端CanalSharp使用,其實Canal涉及知識點還是很多的,例如配置MQ模式、服務集群、Web管理界面部署,多實例等等。后面如果有時間,我還會繼續花時間去學習。

參考文獻:
CanalSharp文檔
CanalSharp


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM