第 一 章 Canal 入門
1.1 什么是 Canal
阿里巴巴 B2B 公司,因為業務的特性,賣家主要集中在國內,買家主要集中在國外,所以衍生出了同步杭州和美國異地機房的需求,從 2010 年開始,阿里系公司開始逐步的嘗試基於數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。
canal [kə'næl],譯意為水道/管道/溝渠。
Canal 是用 Java 開發的基於數據庫增量日志解析,提供增量數據訂閱&消費的中間件。
目前。Canal 主要支持了 MySQL 的 Binlog 解析,解析完成后才利用 Canal Client 來處理獲得的相關數據。(數據庫同步需要阿里的 Otter 中間件,基於Canal)。
1.2 MySQL 的 Binlog
1.2.1 什么是 Binlog
MySQL 的二進制日志可以說 MySQL 最重要的日志了,它記錄了所有的 DDL 和 DML(除了數據查詢語句)語句,以事件形式記錄,還包含語句所執行的消耗的時間,MySQL 的二進制日志是事務安全型的。
一般來說開啟二進制日志大概會有 1%的性能損耗。二進制有兩個最重要的使用場景:
- MySQL Replication 在 Master 端開啟 Binlog,Master 把它的二進制日志傳遞給 Slaves來達到 Master-Slave 數據一致的目的。
- 自然就是數據恢復了,通過使用 MySQL Binlog 工具來使恢復數據。
二進制日志包括兩類文件:二進制日志索引文件(文件名后綴為.index)用於記錄所有的二進制文件,二進制日志文件(文件名后綴為.00000*)記錄數據庫所有的 DDL 和 DML(除了數據查詢語句)語句事件。
1.2.2 Binlog 的分類
MySQL Binlog 的格式有三種,分別是 STATEMENT,MIXED,ROW。在配置文件中可以選擇配置 binlog_format= statement|mixed|row。三種格式的區別:
1)statement:語句級,binlog 會記錄每次一執行寫操作的語句。相對 row 模式節省空間,但是可能產生不一致性,比如“update tt set create_date=now()”,如果用 binlog 日志進行恢復,由於執行時間不同可能產生的數據就不同。
- 優點:節省空間。
- 缺點:有可能造成數據不一致。
2)row:行級, binlog 會記錄每次操作后每行記錄的變化。
- 優點:保持數據的絕對一致性。因為不管 sql 是什么,引用了什么函數,他只記錄執行后的效果。
- 缺點:占用較大空間。
3)mixed:statement 的升級版,一定程度上解決了,因為一些情況而造成的 statement模式不一致問題,默認還是 statement,在某些情況下譬如:當函數中包含 UUID() 時;包含AUTO_INCREMENT 字段的表被更新時;執行 INSERT DELAYED 語句時;用 UDF 時;會按照ROW 的方式進行處理
- 優點:節省空間,同時兼顧了一定的一致性。
- 缺點:還有些極個別情況依舊會造成不一致,另外 statement 和 mixed 對於需要對binlog 的監控的情況都不方便。
綜合上面對比,若使用Canal 想做實時監控分析,直接將statement的SQL語句發送給Flink或SparkStream是不合適的,因為沒有SQL引擎,選擇 row 格式比較合適。
1.3 Canal 的工作原理
1.3.1 MySQL 主從復制過程
1)Master 主庫將改變記錄(二進制日志事件binary log events),寫到二進制日志(Binary Log)中;
2)Slave 從庫向 MySQL Master 發送 dump 協議,將 Master 主庫的 binary log events 拷貝到它的中繼日志(relay log);
3)Slave 從庫讀取並重做中繼日志中的事件,將改變的數據同步到自己的數據庫。
1.3.2 Canal 的工作原理
Canal的工作原理就是把自己偽裝成 Slave,假裝從 Master 復制數據。
- canal 模擬 MySQL Slave 的交互協議,偽裝自己為 MySQL Slave ,向 MySQL master 發送dump 協議
- MySQL Master 收到 dump 請求,開始推送 binary log 給 Slave (即 canal )
- Canal 解析 binary log 對象(原始為 byte 流)

1.4 使用場景
1)原始場景: 阿里 Otter 中間件的一部分
Otter 是阿里用於進行異地數據庫之間的同步框架,Canal 是其中一部分。
2)常見場景 1:更新緩存
3)常見場景 2:抓取業務表的新增變化數據,用於制作實時統計(我們就是這種場景)
第二章 MySQL准備
2.1 創建數據庫
2.2 創建數據表
CREATE TABLE user_info(
`id` VARCHAR(255),
`name` VARCHAR(255),
`sex` VARCHAR(255)
);
2.3 修改配置文件開啟Binlog
[wkf@hadoop102 ~]$ sudo vim /etc/my.cnf
server-id=1
log-bin=mysql-bin
binlog_format=row
binlog-do-db=gmall-2022
注意:binlog-do-db 根據自己的情況進行修改,指定具體要同步的數據庫,如果不配置則表示所有數據庫均開啟Binlog
2.4 重啟 MySQL使配置生效
sudo systemctl restart mysqld
到/var/lib/mysql目錄下查看初始文件大小 154
[wkf@hadoop102 lib]$ pwd
/var/lib
2.5 測試 Binlog是否開啟
1)插入數據
INSERT INTO user_info VALUES('1001','zhangsan','male');
2)再次到 /var/lib/mysql目錄下,查看 index文件的大小
2.6 賦權限
在MySQL中執行
mysql> set global validate_password_length=4;
mysql> set global validate_password_policy=0;
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal'
第三章 Canal 的下載和安裝
3.1 下載並解壓 Jar包
https://github.com/alibaba/canal/releases
下載的 canal.deployer-1.1.2.tar.gz拷貝到 /opt/sortware目錄下,然后解壓到 /opt/module/canal包下
注意:canal解壓后是分散的,我們在指定解壓目錄的時候需要將 canal指定上
[wkf@hadoop 102 software]$ mkdir /opt/module/canal
[wkf@hadoop 102 software]$ tar zxvf canal.deployer-1.1.2.tar.gz -C /opt/module/canal
3.2 修改 canal.properties的配置
[wkf@hadoop102 conf]$ pwd
/opt/module/canal/conf
[wkf@hadoop102 conf]$ vim canal.properties
######### common argument #############
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = tcp
# flush meta cursor/parse position to file
說明:
這個文件是 canal的基本通用配置, canal端口號默認就是 11111 修改 canal的輸出 model,默認 tcp,kafka模式改為輸出到 kafka
多實例配置如果創建多個實例 通過前面 canal架構,我們可以知道,一個 canal服務中可以有多個 instance conf/下的每一個 example即是一個實例,每個實例下面都有獨立的配置文件。默認只有一個實例 example,如果需要多個實例處理不同的 MySQL數據的話,直接拷貝出多個 example,並對其重新命名,命名和配置文件中指定的名稱一致,然后修改canal.properties中的 canal.destinations=實例 1,實例 2,實例 3。
######### destinations #############
canal.destinations = example
3.3 修改 instance.properties
我們這里只讀取一個MySQL數據,所以只有一個實例,這個實例的配置文件在
conf/example目錄下
[wkf@hadoop 102 example]$ pwd
/opt/module/canal/conf/example
[wkf@hadoop 102 example]$ vim instance.properties
1)配置 MySQL服務器地址
## mysql serverId , v1.0.26+ will autoGen
canal.instance.mysql.slaveId=20
# enable gtid use true/false
canal.instance.gtidon=false
# position info
canal.instance.master.address=hadoop102:3306
2)配置連接 MySQL的用戶名和密碼,默認就是我們前面授權的 canal
# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.instance.defaultDatabaseName =test
# enable druid Decrypt database password
canal.instance.enableDruid=false
第四章 實時監控測試
4.1 TCP模式測試
4.1.1 創建 gmall-canal項目
4.1.2 在 gmall-canal模塊中配置 pom.xml
<dependencies>
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.4.1</version>
</dependency>
</dependencies>
4.1.3 通用監視類 CanalClient
1)Canal 封裝的數據結構
2)在gmall-canal 模塊下創建com.atguigu.app 包,並在包下創建CanalClient(java 代碼)代碼如下:
package com.atguigu.app;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import java.net.InetSocketAddress;
import java.util.List;
public class CanalClient {
public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {
//TODO 獲取連接
CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress("hadoop102", 11111), "example", "root", "123456");
while (true) {
//TODO 連接
canalConnector.connect();
//TODO 訂閱數據庫
canalConnector.subscribe("gmall-2022.*");
//TODO 獲取數據
Message message = canalConnector.get(100);
//TODO 獲取Entry集合
List<CanalEntry.Entry> entries = message.getEntries();
//TODO 判斷集合是否為空,如果為空,則等待一會繼續拉取數據
if (entries.size() <= 0) {
System.out.println("當次抓取沒有數據,休息一會。。。。。。");
Thread.sleep(1000);
} else {
//TODO 遍歷entries,單條解析
for (CanalEntry.Entry entry : entries) {
//1.獲取表名
String tableName = entry.getHeader().getTableName();
//2.獲取類型
CanalEntry.EntryType entryType = entry.getEntryType();
//3.獲取序列化后的數據
ByteString storeValue = entry.getStoreValue();
//4.判斷當前entryType類型是否為ROWDATA
if (CanalEntry.EntryType.ROWDATA.equals(entryType)) {
//5.反序列化數據
CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);
//6.獲取當前事件的操作類型
CanalEntry.EventType eventType = rowChange.getEventType();
//7.獲取數據集
List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
//8.遍歷rowDataList,並打印數據集
for (CanalEntry.RowData rowData : rowDataList) {
JSONObject beforeData = new JSONObject();
List<CanalEntry.Column> beforeColumnsList = rowData.getBeforeColumnsList();
for (CanalEntry.Column column : beforeColumnsList) {
beforeData.put(column.getName(), column.getValue());
}
JSONObject afterData = new JSONObject();
List<CanalEntry.Column> afterColumnsList = rowData.getAfterColumnsList();
for (CanalEntry.Column column : afterColumnsList) {
afterData.put(column.getName(), column.getValue());
}
//數據打印
System.out.println("Table:" + tableName +
",EventType:" + eventType +
",Before:" + beforeData +
",After:" + afterData);
}
} else {
System.out.println("當前操作類型為:" + entryType);
}
}
}
}
}
}
開啟Canal服務端
[wkf@hadoop102 canal]$ bin/startup.sh
運行客戶端,運行過程中向數據庫插入一條數據
4.2 Kafka模式測試
- 修改 canal.properties中 canal的輸出 model,默認 tcp,改為輸出到 kafka
######### common argument #############
canal.id = 1
canal.ip =
canal.port = 11111
canal.metrics.pull.po rt = 11112
canal.zkServers =
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = kafka
# flush meta cursor/parse position to file
- 修改 Kafka集群的地址
######### MQ
canal.mq.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
- 修改 instance.properties輸出到 Kafka的主題以及分區數
# mq conf ig
canal.mq.topic= canal_ test
canal.mq.partitionsNum=1
# hash partition config
#canal.mq.partition=0
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
注意:默認還是輸出到指定Kafka主題的一個 kafka分區,因為多個分區並行可能會打亂 binlog的順序 如果要提高並行度,首先設置 kafka的分區數 >1,然后設置
canal.mq.partitionHash屬性
- 啟動 Canal
[wkf@hadoop102 example]$ cd /opt/module/
[wkf@hadoop102 canal]$ bin/startup.sh
- 看到 CanalLauncher你表示啟動成功,同時會創建 canal_test主題
[wkf@hadoop102 canal]$ jps
2269 Jps
2253 CanalLauncher
- 啟動 Kafka消費客戶端測試,查看消費情況
[wkf@hadoop 102 kafka]$ bin/kafka console consumer.sh bootstrap server hadoop102:9092 topic canal_test
7)向 MySQL中插入數據后查看消費者控制台
插入數據
INSERT INTO user_info VALUES('1001','zhangsan','male'),('1002','lisi','female');
Kafka 消費者控制台
{"data":[{"id":"1001","name":"zhangsan","sex":"male"},{"id":"1002","name":"lisi","sex":"female"}],"database":"gmall2022","es":1639360729000,"id":1,"isDdl":false,"mysqlType":{"id":"varchar(255)","name":"varchar(255)","sex":"varchar(255)"},"old":null,"sql":"","sqlType":{"id":12,"name":12,"sex":12},"table":"user_info","ts":1639361038454,"type":"INSERT"}