基於Canal和Kafka實現MySQL的Binlog近實時同步


前提

近段時間,業務系統架構基本完備,數據層面的建設比較薄弱,因為筆者目前工作重心在於搭建一個小型的數據平台。優先級比較高的一個任務就是需要近實時同步業務系統的數據(包括保存、更新或者軟刪除)到一個另一個數據源,持久化之前需要清洗數據並且構建一個相對合理的便於后續業務數據統計、標簽系統構建等擴展功能的數據模型。基於當前團隊的資源和能力,優先調研了Alibaba開源中間件Canal的使用。

這篇文章簡單介紹一下如何快速地搭建一套Canal相關的組件。

關於Canal

簡介

下面的簡介和下一節的原理均來自於Canal項目的README

Canal[kə'næl],譯意為水道/管道/溝渠,主要用途是基於MySQL數據庫增量日志解析,提供增量數據訂閱和消費。早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務trigger獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。

基於日志增量訂閱和消費的業務包括:

  • 數據庫鏡像
  • 數據庫實時備份
  • 索引構建和實時維護(拆分異構索引、倒排索引等)
  • 業務Cache刷新
  • 帶業務邏輯的增量數據處理

Canal的工作原理

MySQL主備復制原理:

  • MySQLMaster實例將數據變更寫入二進制日志(binary log,其中記錄叫做二進制日志事件binary log events,可以通過show binlog events進行查看)
  • MySQLSlave實例將masterbinary log events拷貝到它的中繼日志(relay log
  • MySQLSlave實例重放relay log中的事件,將數據變更反映它到自身的數據

Canal的工作原理如下:

  • Canal模擬MySQL Slave的交互協議,偽裝自己為MySQL Slave,向MySQL Master發送dump協議
  • MySQL Master收到dump請求,開始推送binary logSlave(即Canal
  • Canal解析binary log對象(原始為byte流),並且可以通過連接器發送到對應的消息隊列等中間件中

關於Canal的版本和部件

截止筆者開始編寫本文的時候(2020-03-05),Canal的最新發布版本是v1.1.5-alpha-12019-10-09發布的),最新的正式版是v1.1.42019-09-02發布的)。其中,v1.1.4主要添加了鑒權、監控的功能,並且做了一些列的性能優化,此版本集成的連接器是TcpKafkaRockerMQ。而v1.1.5-alpha-1版本已經新增了RabbitMQ連接器,但是此版本的RabbitMQ連接器暫時不能定義連接RabbitMQ的端口號,不過此問題已經在master分支中修復(具體可以參看源碼中的CanalRabbitMQProducer類的提交記錄)。換言之,v1.1.4版本中目前能使用的內置連接器只有TcpKafkaRockerMQ三種,如果想嘗鮮使用RabbitMQ連接器,可以選用下面的兩種方式之一:

  • 選用v1.1.5-alpha-1版本,但是無法修改RabbitMQport屬性,默認為5672
  • 基於master分支自行構建Canal

目前,Canal項目的活躍度比較高,但是考慮到功能的穩定性問題,筆者建議選用穩定版本在生產環境中實施,當前可以選用v1.1.4版本,本文的例子用選用的就是v1.1.4版本,配合Kafka連接器使用Canal主要包括三個核心部件:

  • canal-admin:后台管理模塊,提供面向WebUICanal管理能力。
  • canal-adapter:適配器,增加客戶端數據落地的適配及啟動功能,包括REST、日志適配器、關系型數據庫的數據同步(表對表同步)、HBase數據同步、ES數據同步等等。
  • canal-deployer:發布器,核心功能所在,包括binlog解析、轉換和發送報文到連接器中等等功能都由此模塊提供。

一般情況下,canal-deployer部件是必須的,其他兩個部件按需選用即可。

部署所需的中間件

搭建一套可以用的組件需要部署MySQLZookeeperKafkaCanal四個中間件的實例,下面簡單分析一下部署過程。選用的虛擬機系統是CentOS7

安裝MySQL

為了簡單起見,選用yum源安裝(官方鏈接是https://dev.mysql.com/downloads/repo/yum):

::: info
mysql80-community-release-el7-3雖然包名帶了mysql80關鍵字,其實已經集成了MySQL主流版本5.6、5.7和8.x等等的最新安裝包倉庫
:::

選用的是最新版的MySQL8.x社區版,下載CentOS7適用的rpm包

cd /data/mysql
wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm
// 下載完畢之后
sudo rpm -Uvh mysql80-community-release-el7-3.noarch.rpm

此時列舉一下yum倉庫里面的MySQL相關的包:

[root@localhost mysql]# yum repolist all | grep mysql
mysql-cluster-7.5-community/x86_64 MySQL Cluster 7.5 Community   disabled
mysql-cluster-7.5-community-source MySQL Cluster 7.5 Community - disabled
mysql-cluster-7.6-community/x86_64 MySQL Cluster 7.6 Community   disabled
mysql-cluster-7.6-community-source MySQL Cluster 7.6 Community - disabled
mysql-cluster-8.0-community/x86_64 MySQL Cluster 8.0 Community   disabled
mysql-cluster-8.0-community-source MySQL Cluster 8.0 Community - disabled
mysql-connectors-community/x86_64  MySQL Connectors Community    enabled:    141
mysql-connectors-community-source  MySQL Connectors Community -  disabled
mysql-tools-community/x86_64       MySQL Tools Community         enabled:    105
mysql-tools-community-source       MySQL Tools Community - Sourc disabled
mysql-tools-preview/x86_64         MySQL Tools Preview           disabled
mysql-tools-preview-source         MySQL Tools Preview - Source  disabled
mysql55-community/x86_64           MySQL 5.5 Community Server    disabled
mysql55-community-source           MySQL 5.5 Community Server -  disabled
mysql56-community/x86_64           MySQL 5.6 Community Server    disabled
mysql56-community-source           MySQL 5.6 Community Server -  disabled
mysql57-community/x86_64           MySQL 5.7 Community Server    disabled
mysql57-community-source           MySQL 5.7 Community Server -  disabled
mysql80-community/x86_64           MySQL 8.0 Community Server    enabled:    161
mysql80-community-source           MySQL 8.0 Community Server -  disabled

編輯/etc/yum.repos.d/mysql-community.repo文件([mysql80-community]塊中enabled設置為1,其實默認就是這樣子,不用改,如果要選用5.x版本則需要修改對應的塊):

[mysql80-community]
name=MySQL 8.0 Community Server
baseurl=http://repo.mysql.com/yum/mysql-8.0-community/el/7/$basearch/
enabled=1
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql

然后安裝MySQL服務:

sudo yum install mysql-community-server

這個過程比較漫長,因為需要下載和安裝5個rpm安裝包(或者是所有安裝包組合的壓縮包mysql-8.0.18-1.el7.x86_64.rpm-bundle.tar)。如果網絡比較差,也可以直接從官網手動下載后安裝:

// 下載下面5個rpm包 common --> libs --> libs-compat --> client --> server
mysql-community-common
mysql-community-libs
mysql-community-libs-compat
mysql-community-client
mysql-community-server

// 強制安裝
rpm -ivh mysql-community-common-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-client-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-server-8.0.18-1.el7.x86_64.rpm --force --nodeps

安裝完畢之后,啟動MySQL服務,然后搜索MySQL服務的root賬號的臨時密碼用於首次登陸(mysql -u root -p):

// 啟動服務,關閉服務就是service mysqld stop
service mysqld start
// 查看臨時密碼 cat /var/log/mysqld.log
[root@localhost log]# cat /var/log/mysqld.log 
2020-03-02T06:03:53.996423Z 0 [System] [MY-013169] [Server] /usr/sbin/mysqld (mysqld 8.0.18) initializing of server in progress as process 22780
2020-03-02T06:03:57.321447Z 5 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: >kjYaXENK6li
2020-03-02T06:04:00.123845Z 0 [System] [MY-010116] [Server] /usr/sbin/mysqld (mysqld 8.0.18) starting as process 22834
// 登錄臨時root用戶,使用臨時密碼
[root@localhost log]# mysql -u root -p

接下來做下面的操作:

  • 修改root用戶的密碼:ALTER USER 'root'@'localhost' IDENTIFIED BY 'QWqw12!@';(注意密碼規則必須包含大小寫字母、數字和特殊字符)
  • 更新roothost,切換數據庫use mysql;,指定host%以便可以讓其他服務器遠程訪問UPDATE USER SET HOST = '%' WHERE USER = 'root';
  • 賦予'root'@'%'用戶,所有權限,執行GRANT ALL PRIVILEGES ON *.* TO 'root'@'%';
  • 改變root'@'%用戶的密碼校驗規則以便可以使用Navicat等工具訪問:ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';

操作完成之后,就可以使用root用戶遠程訪問此虛擬機上的MySQL服務。最后確認是否開啟了binlog(注意一點是MySQL8.x默認開啟binlogSHOW VARIABLES LIKE '%bin%';

最后在MySQLShell執行下面的命令,新建一個用戶名canal密碼為QWqw12!@的新用戶,賦予REPLICATION SLAVEREPLICATION CLIENT權限:

CREATE USER canal IDENTIFIED BY 'QWqw12!@';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';

切換回去root用戶,創建一個數據庫test

CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;

安裝Zookeeper

CanalKafka集群都依賴於Zookeeper做服務協調,為了方便管理,一般會獨立部署Zookeeper服務或者Zookeeper集群。筆者這里選用2020-03-04發布的3.6.0版本:

midkr /data/zk
# 創建數據目錄
midkr /data/zk/data
cd /data/zk
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
cd apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg

zoo.cfg文件中的dataDir設置為/data/zk/data,然后啟動Zookeeper

[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

這里注意一點,要啟動此版本的Zookeeper服務必須本地安裝好JDK8+,這一點需要自行處理。啟動的默認端口是2181,啟動成功后的日志如下:

安裝Kafka

Kafka是一個高性能分布式消息隊列中間件,它的部署依賴於Zookeeper。筆者在此選用2.4.0並且Scala版本為2.13的安裝包:

mkdir /data/kafka
mkdir /data/kafka/data
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -zxvf kafka_2.13-2.4.0.tgz

由於解壓后/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中對應的zookeeper.connect=localhost:2181已經符合需要,不必修改,需要修改日志文件的目錄log.dirs/data/kafka/data。然后啟動Kafka服務:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties

這樣啟動一旦退出控制台就會結束Kafka進程,可以添加-daemon參數用於控制Kafka進程后台不掛斷運行。

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties

安裝和使用Canal

終於到了主角登場,這里選用Canalv1.1.4穩定發布版,只需要下載deployer模塊:

mkdir /data/canal
cd /data/canal
# 這里注意一點,Github在國內被牆,下載速度極慢,可以先用其他下載工具下載完再上傳到服務器中
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz

解壓后的目錄如下:

- bin   # 運維腳本
- conf  # 配置文件
  canal_local.properties  # canal本地配置,一般不需要動
  canal.properties        # canal服務配置
  logback.xml             # logback日志配置
  metrics                 # 度量統計配置
  spring                  # spring-實例配置,主要和binlog位置計算、一些策略配置相關,可以在canal.properties選用其中的任意一個配置文件
  example                 # 實例配置文件夾,一般認為單個數據庫對應一個獨立的實例配置文件夾
    instance.properties   # 實例配置,一般指單個數據庫的配置
- lib   # 服務依賴包
- logs  # 日志文件輸出目錄

在開發和測試環境建議把logback.xml的日志級別修改為DEBUG方便定位問題。這里需要關注canal.propertiesinstance.properties兩個配置文件。canal.properties文件中,需要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16這個配置項的注釋,也就是啟用此配置項,和實例解析器的線程數相關,不配置會表現為阻塞或者不進行解析。
  • canal.serverMode配置項指定為kafka,可選值有tcpkafkarocketmqmaster分支或者最新的的v1.1.5-alpha-1版本,可以選用rabbitmq),默認是kafka
  • canal.mq.servers配置需要指定為Kafka服務或者集群Broker的地址,這里配置為127.0.0.1:9092

canal.mq.servers在不同的canal.serverMode有不同的意義。
kafka模式下,指Kafka服務或者集群Broker的地址,也就是bootstrap.servers
rocketmq模式下,指NameServer列表
rabbitmq模式下,指RabbitMQ服務的Host和Port

其他配置項可以參考下面兩個官方Wiki的鏈接:

instance.properties一般指一個數據庫實例的配置,Canal架構支持一個Canal服務實例,處理多個數據庫實例的binlog異步解析。instance.properties需要修改的配置項主要包括:

  • canal.instance.mysql.slaveId需要配置一個和Master節點的服務ID完全不同的值,這里筆者配置為654321
  • 配置數據源實例,包括地址、用戶、密碼和目標數據庫:
    • canal.instance.master.address,這里指定為127.0.0.1:3306
    • canal.instance.dbUsername,這里指定為canal
    • canal.instance.dbPassword,這里指定為QWqw12!@
    • 新增canal.instance.defaultDatabaseName,這里指定為test(需要在MySQL中建立一個test數據庫,見前面的流程)。
  • Kafka相關配置,這里暫時使用靜態topic和單個partition
    • canal.mq.topic,這里指定為test也就是解析完的binlog結構化數據會發送到Kafka的命名為testtopic
    • canal.mq.partition,這里指定為0

配置工作做好之后,可以啟動Canal服務:

sh /data/canal/bin/startup.sh 
# 查看服務日志
tail -100f /data/canal/logs/canal/canal
# 查看實例日志  -- 一般情況下,關注實例日志即可
tail -100f /data/canal/logs/example/example.log

啟動正常后,見實例日志如下:

test數據庫創建一個訂單表,並且執行幾個簡單的DML

use `test`;

CREATE TABLE `order`
(
    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',
    order_id    VARCHAR(64)    NOT NULL COMMENT '訂單ID',
    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',
    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',
    UNIQUE uniq_order_id (`order_id`)
) COMMENT '訂單表';

INSERT INTO `order`(order_id, amount) VALUES ('10086', 999);
UPDATE `order` SET amount = 10087 WHERE order_id = '10086';
DELETE  FROM `order` WHERE order_id = '10086';

這個時候,可以利用Kafkakafka-console-consumer或者Kafka Tools查看test這個topic的數據:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test

具體的數據如下:

// test數據庫建庫腳本
{"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"}

// order表建表DDL
{"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `order`\n(\n    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',\n    order_id    VARCHAR(64)    NOT NULL COMMENT '訂單ID',\n    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',\n    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '創建時間',\n    UNIQUE uniq_order_id (`order_id`)\n) COMMENT '訂單表'","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"}

// INSERT
{"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"}

// UPDATE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":[{"amount":"999.0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"}

// DELETE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}

可見Kafka的名為testtopic已經寫入了對應的結構化binlog事件數據,可以編寫消費者監聽Kafka對應的topic然后對獲取到的數據進行后續處理。

小結

這篇文章大部分篇幅用於介紹其他中間件是怎么部署的,這個問題側面說明了Canal本身部署並不復雜,它的配置文件屬性項比較多,但是實際上需要自定義和改動的配置項是比較少的,也就是說明了它的運維成本和學習成本並不高。后面會分析基於結構化binlog事件做ELT和持久化相關工作以及Canal的生產環境可用級別HA集群的搭建。

參考資料:

個人博客

(本文完 c-3-d e-a-20200306)

技術公眾號(《Throwable文摘》),不定期推送筆者原創技術文章(絕不抄襲或者轉載):


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM