一、基本概念
mysql本身支持主從備份,原理就是主庫master生成的binlog文件記錄了所有的增刪改操作語句,然后slave向master發送dump協議,master將binlog日志文件推送給從庫slave解析執行,達到數據一致備份的目的。
canal,基於java開發,偽裝成一個slave,去監聽獲取增量的binlog日志文件,然后解析處理獲得的相關數據(過程中可以加入自由的加入一些額外的功能性代碼需求),利用獲得的數據,可以用其他不同用途,比如同步到es中做搜索相關。
二、canal基本配置使用
測試環境:windows、mysql 5.7.26、canal 1.1.3、Navicat for MySQL。
1、mysql安裝和配置
1.1、下載安裝解壓忽略。進入mysql解壓后目錄,新增data文件夾。
1.2、新增my.ini文件,添加配置:
[client] # 設置mysql客戶端連接服務端時默認使用的端口 port=3311 [mysql] default-character-set=utf8 [mysqld] character-set-server=utf8 port=3311 # 默認存儲引擎innoDB default-storage-engine=INNODB # Server Id.數據庫服務器id,這個id用來在主從服務器中標記唯一mysql服務器 server-id=1 datadir=E:\\soft\\mysql2\\data bind-address=0.0.0.0 # 開啟binlog日志 log-bin=mysql-bin binlog_format = ROW
1.3、cmd進入並目錄,啟動/關閉 mysql:
//啟動 net start mysql //關閉 net stop mysql
1.4、連接mysql並設置密碼
連接:mysql -uroot -p,初始密碼為空,一直按enter即可進入mysql命令行。
進入后設置密碼:
// 切換庫
use mysql;
// 設置密碼
update user set authentication_string=PASSWORD("123456") where user="root";
// 刷新生效
flush privileges;
設置成功后,quit退出重進,輸入密碼123456。
1.5、新增個canal的訪問賬戶
// 新增用戶 CREATE USER 'canal'@'%' IDENTIFIED BY 'canal'; // 授權 GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; // 刷新 FLUSH PRIVILEGES;
2、canal安裝配置
下載canal包(https://github.com/alibaba/canal/releases),解壓本地目錄。
2.1、目錄結構

其中cfang是拷貝example,需要多個instance可繼續拷貝,再修改每個instance中的配置文件。
2.1、配置canal.properties

port可自定義,用於canal對外服務接口。destinations配置instance列表(連接db)。
2.2、配置instance.properties

其中canal.instance.defaultDatabaseName可不配置,全庫掃描。
2.3、啟動
bin目錄,點擊startup.bat,查看/logs/canal/canal.log日志文件,出現以下則為開啟成功:

2.4、canal數據格式:
Entry
Header
logfileName [binlog文件名]
logfileOffset [binlog position]
executeTime [發生的變更]
schemaName
tableName
eventType [insert/update/delete類型]
entryType [事務頭BEGIN/事務尾END/數據ROWDATA]
storeValue [byte數據,可展開,對應的類型為RowChange]
RowChange
isDdl [是否是ddl變更操作,比如create table/drop table]
sql [具體的ddl sql]
rowDatas [具體insert/update/delete的變更數據,可為多條,1個binlog event事件可對應多條變更,比如批處理]
beforeColumns [Column類型的數組]
afterColumns [Column類型的數組]
Column
index
sqlType [jdbc type]
name [column name]
isKey [是否為主鍵]
updated [是否發生過變更]
isNull [值是否為null]
value [具體的內容,注意為文本]
2.5、java程序測試
pom導入:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
java測試:
package com.cfang.prebo;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.stream.Collectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;
public class CanalTest {
public static void main(String[] args) throws Exception {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "cfang", "", "");
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(100); // 獲取指定數量的數據
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
Thread.sleep(1000);
continue;
}
// System.out.println(message.getEntries());
printEntries(message.getEntries());
connector.ack(batchId);// 提交確認,消費成功,通知server刪除數據
// connector.rollback(batchId);// 處理失敗, 回滾數據,后續重新獲取數據
}
}
private static void printEntries(List<Entry> entries) throws Exception {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
for (RowData rowData : rowChange.getRowDatasList()) {
switch (rowChange.getEventType()) {
case INSERT:
System.out.println("INSERT ");
printColumns(rowData.getAfterColumnsList());
break;
case UPDATE:
System.out.println("UPDATE ");
printColumns(rowData.getAfterColumnsList());
break;
case DELETE:
System.out.println("DELETE ");
printColumns(rowData.getBeforeColumnsList());
break;
default:
break;
}
}
}
}
private static void printColumns(List<Column> columns) {
for(Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
Navicat中進行相關操作的時候,可在控台看到輸出,例如:

