阿里巴巴MySQL開源中間件Canal入門


image

前言

距離上一篇文章發布又過去了兩周,這次先填掉上一篇秒殺系統文章結尾處開的坑,介紹一下數據庫中間件Canal的使用。

Canal用途很廣,並且上手非常簡單,小伙伴們在平時完成公司的需求時,很有可能會用到。

舉個例子:

公司目前有多個開發人員正在開發一套服務,為了縮短調用延時,對部分接口數據加入了緩存。一旦這些數據在數據庫中進行了更新操作,緩存就成了舊數據,必須及時刪除。

刪除緩存的代碼理所當然可以寫在更新數據的業務代碼里,但有時候者寫操作是在別的項目代碼里,你可能無權修改,亦或者別人不願你在他代碼里寫這種業務之外的代碼。(畢竟多人協作中間會產生各種配合問題)。又或者就是單純的刪除緩存的操作失敗了,緩存依然是舊數據。

正如上篇文章緩存與數據庫雙寫一致性實戰里面所說,我們可以將緩存更新操作完全獨立出來,形成一套單獨的系統。Canal正是這么一個很好的幫手。 能幫我們實現像下圖這樣的系統:

image

本篇文章的要點如下:

  • Canal是什么
  • Canal工作原理
  • 數據庫的讀寫分離
  • 數據庫主從同步
  • 數據庫主從同步一致性問題
    • 異步復制
    • 全同步復制
    • 半同步復制
  • Canal實戰
    • 開啟MySQL Binlog
    • 配置Canal服務
    • 運行Canal服務
    • Java客戶端Demo

歡迎關注我的個人公眾號獲取最全的原創文章:后端技術漫談(二維碼見文章底部)

阿里開源MySQL中間件Canal快速入門

Canal是什么

眾所周知,阿里是國內比較早地大量使用MySQL的互聯網企業(去IOE化:去掉IBM的小型機、Oracle數據庫、EMC存儲設備,代之以自己在開源軟件基礎上開發的系統),並且基於阿里巴巴/淘寶的業務,從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。

Canal應運而生,它通過偽裝成數據庫的從庫,讀取主庫發來的binlog,用來實現數據庫增量訂閱和消費業務需求

Canal用途:

  • 數據庫鏡像
  • 數據庫實時備份
  • 索引構建和實時維護(拆分異構索引、倒排索引等)
  • 業務 cache 緩存刷新
  • 帶業務邏輯的增量數據處理

開源項目地址:

https://github.com/alibaba/canal

在這里就不再摘抄項目簡介了,提煉幾個值得注意的點:

  • canal 使用 client-server 模式,數據傳輸協議使用 protobuf 3.0(很多RPC框架也在使用例如gRPC)
  • 當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
  • canal 作為 MySQL binlog 增量獲取和解析工具,可將變更記錄投遞到 MQ 系統中,比如 Kafka/RocketMQ。

Canal工作原理

Canal實際是將自己偽裝成數據庫的從庫,來讀取Binlog。我們先補習下關於MySQL數據庫主從數據庫的基礎知識,這樣就能更快的理解Canal。

數據庫的讀寫分離

為了應對高並發場景,MySQL支持把一台數據庫主機分為單獨的一台寫主庫(主要負責寫操作),而把讀的數據庫壓力分配給讀的從庫,而且讀從庫可以變為多台,這就是讀寫分離的典型場景。

image

數據庫主從同步

實現數據庫的讀寫分離,是通過數據庫主從同步,讓從數據庫監聽主數據庫Binlog實現的。大體流程如下圖:

MySQL master 將數據變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看)

MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)

MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據

image

詳細主從同步原理在這里就不展開細說了。

可以看到,這種架構下會有一個問題,數據庫主從同步會存在延遲,那么就會有短暫的時間,主從數據庫的數據是不一致的。

這種不一致大多數情況下非常短暫,很多時候我們可以忽略他。

但一旦要求數據一致,就會引申出如何解決這個問題的思考。

數據庫主從同步一致性問題

我們通常使用MySQL主從復制來解決MySQL的單點故障問題,其通過邏輯復制的方式把主庫的變更同步到從庫,主備之間無法保證嚴格一致的模式,

於是,MySQL的主從復制帶來了主從“數據一致性”的問題。MySQL的復制分為:異步復制、半同步復制、全同步復制。

異步復制

MySQL默認的復制即是異步復制,主庫在執行完客戶端提交的事務后會立即將結果返給給客戶端,並不關心從庫是否已經接收並處理,這樣就會有一個問題,主如果crash掉了,此時主上已經提交的事務可能並沒有傳到從庫上,如果此時,強行將從提升為主,可能導致新主上的數據不完整。

主庫將事務 Binlog 事件寫入到 Binlog 文件中,此時主庫只會通知一下 Dump 線程發送這些新的 Binlog,然后主庫就會繼續處理提交操作,而此時不會保證這些 Binlog 傳到任何一個從庫節點上。

全同步復制

指當主庫執行完一個事務,所有的從庫都執行了該事務才返回給客戶端。因為需要等待所有從庫執行完該事務才能返回,所以全同步復制的性能必然會收到嚴重的影響。

當主庫提交事務之后,所有的從庫節點必須收到、APPLY並且提交這些事務,然后主庫線程才能繼續做后續操作。但缺點是,主庫完成一個事務的時間會被拉長,性能降低。

半同步復制

是介於全同步復制與全異步復制之間的一種,主庫只需要等待至少一個從庫節點收到並且 Flush Binlog 到 Relay Log 文件即可,主庫不需要等待所有從庫給主庫反饋。同時,這里只是一個收到的反饋,而不是已經完全完成並且提交的反饋,如此,節省了很多時間。

介於異步復制和全同步復制之間,主庫在執行完客戶端提交的事務后不是立刻返回給客戶端,而是等待至少一個從庫接收到並寫到relay log中才返回給客戶端。相對於異步復制,半同步復制提高了數據的安全性,同時它也造成了一定程度的延遲,這個延遲最少是一個TCP/IP往返的時間。所以,半同步復制最好在低延時的網絡中使用。

image

事實上,半同步復制並不是嚴格意義上的半同步復制,MySQL半同步復制架構中,主庫在等待備庫ack時候,如果超時會退化為異步后,也可能導致“數據不一致”。

當半同步復制發生超時時(由rpl_semi_sync_master_timeout參數控制,單位是毫秒,默認為10000,即10s),會暫時關閉半同步復制,轉而使用異步復制。當master dump線程發送完一個事務的所有事件之后,如果在rpl_semi_sync_master_timeout內,收到了從庫的響應,則主從又重新恢復為半同步復制。

關於半同步復制的詳細原理分析可以看這篇引申文章,在此不展開:

https://www.cnblogs.com/ivictor/p/5735580.html

回到Canal的工作原理

回顧了數據庫從庫的數據同步原理,理解Canal十分簡單,直接引用官網原文:

  • canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議
  • MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
  • canal 解析 binary log 對象(原始為 byte 流)

Canal實戰

開啟MySQL Binlog

這個步驟我在之前的文章教你使用Binlog日志恢復誤刪的MySQL數據已經提到過,這里完善了一下,再貼一下,方便大家。

首先進入數據庫控制台,運行指令:

mysql> show variables like'log_bin%';
+---------------------------------+-------+
| Variable_name                   | Value |
+---------------------------------+-------+
| log_bin                         | OFF   |
| log_bin_basename                |       |
| log_bin_index                   |       |
| log_bin_trust_function_creators | OFF   |
| log_bin_use_v1_row_events       | OFF   |
+---------------------------------+-------+
5 rows in set (0.00 sec)

可以看到我們的binlog是關閉的,都是OFF。接下來我們需要修改Mysql配置文件,執行命令:

sudo vi /etc/mysql/mysql.conf.d/mysqld.cnf

在文件末尾添加:

log-bin=/var/lib/mysql/mysql-bin
binlog-format=ROW

保存文件,重啟mysql服務:

sudo service mysql restart

重啟完成后,查看下mysql的狀態:

systemctl status mysql.service

這時,如果你的mysql版本在5.7或更高版本,就會報錯:

Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190791Z 0 [Warning] Changed limits: max_open_files: 1024 (requested 5000)
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.190839Z 0 [Warning] Changed limits: table_open_cache: 431 (requested 2000)
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.359713Z 0 [Warning] TIMESTAMP with implicit DEFAULT value is deprecated. Please use --explicit_defaults_for_timestamp server option (se
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.361395Z 0 [Note] /usr/sbin/mysqld (mysqld 5.7.28-0ubuntu0.16.04.2-log) starting as process 5930 ...
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363017Z 0 [ERROR] You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363747Z 0 [ERROR] Aborting
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.363922Z 0 [Note] Binlog end
Jan 06 15:49:58 VM-0-11-ubuntu mysqld[5930]: 2020-01-06T07:49:58.364108Z 0 [Note] /usr/sbin/mysqld: Shutdown complete
Jan 06 15:49:58 VM-0-11-ubuntu systemd[1]: mysql.service: Main process exited, code=exited, status=1/FAILURE

You have enabled the binary log, but you haven't provided the mandatory server-id. Please refer to the proper server

之前我們的配置,對於5.7以下版本應該是可以的。但對於高版本,我們需要指定server-id。

我們給這個MySQL指定為2(只要不與其他庫id重復):

server-id=2

創建數據庫Canal使用賬號

mysql> select user, host from user;
+------------------+-----------+
| user             | host      |
+------------------+-----------+
| root             | %         |
| debian-sys-maint | localhost |
| mysql.session    | localhost |
| mysql.sys        | localhost |
| root             | localhost |
+------------------+-----------+
5 rows in set
CREATE USER canal IDENTIFIED BY 'xxxx';  (填寫密碼)  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';  
FLUSH PRIVILEGES;  

show grants for 'canal' 

配置Canal服務

去Github下載最近的Canal穩定版本包:

解壓縮:

mkdir /tmp/canal
tar zxvf canal.deployer-$version.tar.gz  -C /tmp/canal

配置文件設置:

主要有兩個文件配置,一個是conf/canal.properties一個是conf/example/instance.properties

為了快速運行Demo,只修改conf/example/instance.properties里的數據庫連接賬號密碼即可

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=xxxxxxx
canal.instance.connectionCharset = UTF-8

運行Canal服務

請先確保機器上有JDK,接着運行Canal啟動腳本:

sh bin/startup.sh

下圖即成功運行:

image

Java客戶端代碼

我在秒殺系統系列文章的代碼倉庫里(miaosha-job)編寫了如下客戶端代碼

倉庫源碼地址:https://github.com/qqxx6661/miaosha

package job;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

public class CanalClient {

    private static final Logger LOGGER = LoggerFactory.getLogger(CanalClient.class);

    public static void main(String[] args) {

        // 第一步:與canal進行連接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
                "example", "", "");
        connector.connect();

        // 第二步:開啟訂閱
        connector.subscribe();

        // 第三步:循環訂閱
        while (true) {
            try {
                // 每次讀取 1000 條
                Message message = connector.getWithoutAck(1000);

                long batchID = message.getId();

                int size = message.getEntries().size();

                if (batchID == -1 || size == 0) {
                    LOGGER.info("當前暫時沒有數據,休眠1秒");
                    Thread.sleep(1000);
                } else {
                    LOGGER.info("-------------------------- 有數據啦 -----------------------");
                    printEntry(message.getEntries());
                }

                connector.ack(batchID);

            } catch (Exception e) {
                LOGGER.error("處理出錯");
            } finally {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * 獲取每條打印的記錄
     */
    public static void printEntry(List<Entry> entrys) {

        for (Entry entry : entrys) {

            // 第一步:拆解entry 實體
            Header header = entry.getHeader();
            EntryType entryType = entry.getEntryType();

            // 第二步: 如果當前是RowData,那就是我需要的數據
            if (entryType == EntryType.ROWDATA) {

                String tableName = header.getTableName();
                String schemaName = header.getSchemaName();

                RowChange rowChange = null;

                try {
                    rowChange = RowChange.parseFrom(entry.getStoreValue());
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }

                EventType eventType = rowChange.getEventType();

                LOGGER.info(String.format("當前正在操作表 %s.%s, 執行操作= %s", schemaName, tableName, eventType));

                // 如果是‘查詢’ 或者 是 ‘DDL’ 操作,那么sql直接打出來
                if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
                    LOGGER.info("執行了查詢語句:[{}]", rowChange.getSql());
                    return;
                }

                // 第三步:追蹤到 columns 級別
                rowChange.getRowDatasList().forEach((rowData) -> {

                    // 獲取更新之前的column情況
                    List<Column> beforeColumns = rowData.getBeforeColumnsList();

                    // 獲取更新之后的 column 情況
                    List<Column> afterColumns = rowData.getAfterColumnsList();

                    // 當前執行的是 刪除操作
                    if (eventType == EventType.DELETE) {
                        printColumn(beforeColumns);
                    }

                    // 當前執行的是 插入操作
                    if (eventType == EventType.INSERT) {
                        printColumn(afterColumns);
                    }

                    // 當前執行的是 更新操作
                    if (eventType == EventType.UPDATE) {
                        printColumn(afterColumns);
                        // 進行刪除緩存操作
                        deleteCache(afterColumns, tableName, schemaName);
                    }


                });
            }
        }
    }

    /**
     * 每個row上面的每一個column 的更改情況
     * @param columns
     */
    public static void printColumn(List<Column> columns) {

        columns.forEach((column) -> {
            String columnName = column.getName();
            String columnValue = column.getValue();
            String columnType = column.getMysqlType();
            // 判斷 該字段是否更新
            boolean isUpdated = column.getUpdated();
            LOGGER.info(String.format("數據列:columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated));
        });
    }

    /**
     * 秒殺下單接口刪除庫存緩存
     */
    public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
        if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
            AtomicInteger id = new AtomicInteger();
            columns.forEach((column) -> {
                String columnName = column.getName();
                String columnValue = column.getValue();
                if ("id".equals(columnName)) {
                    id.set(Integer.parseInt(columnValue));
                }
            });
            // TODO: 刪除緩存
            LOGGER.info("Canal刪除stock表id:[{}] 的庫存緩存", id);

        }
    }
}

代碼中有詳細的注釋,就不做解釋了。

我們跑起代碼,緊接着我們在數據庫中進行更改UPDATE操作,把法外狂徒張三改成張三1,然后再改回張三,見下圖。

image

Canal成功收到了兩條更新操作:

image

緊接着我們模擬一個刪除Cache緩存的業務,在代碼中有:

/**
 * 秒殺下單接口刪除庫存緩存
 */
public static void deleteCache(List<Column> columns, String tableName, String schemaName) {
    if ("stock".equals(tableName) && "m4a_miaosha".equals(schemaName)) {
        AtomicInteger id = new AtomicInteger();
        columns.forEach((column) -> {
            String columnName = column.getName();
            String columnValue = column.getValue();
            if ("id".equals(columnName)) {
                id.set(Integer.parseInt(columnValue));
            }
        });
        // TODO: 刪除緩存
        LOGGER.info("Canal刪除stock表id:[{}] 的庫存緩存", id);

    }
}

在上面的代碼中,在收到m4a_miaosha.stock表的更新操作后,我們刷新庫存緩存。效果如下:

image

image

簡單的Canal使用就介紹到這里,剩下的發揮空間留給各位讀者大大們。

總結

本文總結了Canal的基本原理和簡單的使用。

總結如下幾點:

  • Canal實際是將自己偽裝成數據庫的從庫,來讀取主數據庫發來的Binlog。
  • Canal用途很廣,比如數據庫實時備份、索引構建和實時維護(拆分異構索引、倒排索引等)、業務 cache 緩存刷新。
  • Canal可以推送至非常多數據源,並支持推送到消息隊列,方便多語言使用。

希望大家多多支持我的原創技術文章公眾號:后端技術漫談,我最全的原創文章都在這里首發。

參考

關注我

我是一名后端開發工程師。主要關注后端開發,數據安全,爬蟲,物聯網,邊緣計算等方向,歡迎交流。

各大平台都可以找到我

原創文章主要內容

  • 后端開發相關技術文章
  • Java面試復習手冊
  • 設計模式/數據結構/LeetCode算法題解
  • 爬蟲/邊緣計算相關技術文章
  • 逸聞趣事/好書分享/個人生活

個人公眾號:后端技術漫談

個人公眾號:后端技術漫談

如果文章對你有幫助,不妨收藏,轉發,在看起來~


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM