canal是阿里巴巴的來源項目。我們可以通過配置binlog實現數據庫監控,得到數據庫表或者數據的更新信息。
參考我的文檔前先去官網看下,可能已經支持更高版本的MySQL了
1. 查看官方開源項目
https://github.com/alibaba/canal
2. 下載最新的canal.deployer-XXXX-SNAPSHOT.tar.gz
https://github.com/alibaba/canal/releases
3. 查看wiki
ps. 目前內部版本已經支持mysql和oracle部分版本的日志解析,當前的canal開源版本支持5.7及以下的版本(阿里內部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)
所以需要安裝mysql5.7以下的版本穩妥些
4. 去MySQL官網下載mysql並安裝
5. 查看mysql安裝目錄
6. 復制一個my-default.ini改名叫my.ini
修改對應位置:
添加:
#添加這一行就ok log-bin=mysql-bin #選擇row模式 binlog-format=ROW #配置mysql replaction需要定義,不能和canal的slaveId重復 server_id=1 character-set-server=utf8 collation-server=utf8_general_ci
添加:
[mysql] default-character-set = utf8 [mysql.server] default-character-set = utf8 [mysqld_safe] default-character-set = utf8 [client] default-character-set = utf8
7. 重啟mysql
.6\bin>sc delete mysql \bin>net stop mysql \bin>mysqld --install mysql --defaults-file="C:\Program Files\MySQL\MySQL Server 5.6\my.ini"
8. 查看是否開啟binlog
show variables like'log_%';
On:表示已開啟
9. 創建數據庫canal用戶
官網是%,%是對所有非本地主機授權,不包括localhost。由於我們是在windows本機上做,所以需要配置為localhost.
10. 修改canal-deploy-> conf\example里的instance.properties
這兩個新添的配置可以注解調,還不太明白具體的用處
canal.instance.tsdb.dbUsername=canal canal.instance.tsdb.dbPassword=canal
注:這里的slaveId=1234不能和my.ini的一樣
11. 在cmd下啟動canal-deploy
如果沒有報錯那就是啟動成功了
12. 創建canal-client服務
Pom或者gradle。主要依賴:
compile group: 'org.jetbrains', name: 'annotations', version: '13.0' compile group: 'com.alibaba.otter', name: 'canal.client', version: '1.0.25'
13. 編寫客戶端類
package com.shao.demo.canalclient;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
import org.jetbrains.annotations.NotNull;
/**
* @author zhiqi.shao
* @Date 2018/6/4 18:29
*/
public class ClientSample {
public static void main(String args[]) {
// 創建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
11111), "example", "", "");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*");
connector.rollback();
int totalEmtryCount = 1200;
while (emptyCount < totalEmtryCount) {
Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據 long batchId =
message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
emptyCount++;
System.out.println("empty count : " + emptyCount);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾數據 }
System.out.println("empty too many times, exit");
} finally {
connector.disconnect();
}
}
private static void printEntry(@NotNull List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
printColumn(rowData.getBeforeColumnsList());
} else if (eventType == EventType.INSERT) {
printColumn(rowData.getAfterColumnsList());
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
printColumn(rowData.getAfterColumnsList());
}
}
}
}
private static void printColumn(@NotNull List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}
14. 啟動客戶端
use canal_test;
CREATE TABLE user (
uid INT(4) PRIMARY KEY NOT NULL AUTO_INCREMENT,
name VARCHAR(10) NOT NULL
);
insert into user (name) values('shaoshao');

15. 關於update
rowData.getAfterColumnsList()
16. 常見錯誤
- 服務端:com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example
是由於你改了配置文件,導致meta.dat 中保存的位點信息和數據庫的位點信息不一致;導致canal抓取不到數據庫的動作;
解決方法:刪除meta.dat刪除,再重啟canal,問題解決; - 客戶端:java.lang.OutOfMemoryError: Java heap space
canal消費端掛了太久,在zk對應conf下節點的
/otter/canal/destinations/test_db/1001/cursor 位點信息是很早以前,導致重啟canal時,從很早以前的位點開始消費,導致canal服務器內存爆掉 - 服務端ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x0191aafd, /192.168.10.68:49502 => /192.168.10.68:11111], exception=java.io.IOException:
當客戶端停掉后,canal服務端會報此異常
客戶端:com.alibaba.otter.canal.protocol.exception.CanalClientException: something goes wrong with reason: something goes wrong with channel:[id: 0x01311037, /192.168.10.68:52086 => /192.168.10.68:11111], exception=com.alibaba.otter.canal.server.exception.CanalServerException: destination:example should start first
當服務端停掉或者重啟中,客戶端連不上就會拋出此異常。場景修改了服務點的配置文件此時服務端會重啟,客戶端就會報次異常
17. 消費過濾
canalConnector.subscribe("canal_test\..");//客戶端只消費canal_test庫的數據變化
subscribe(filter)方法;有的話,filter需要和instance.properties的canal.instance.filter.regex一致,否則subscribe的filter會覆蓋instance的配置,如果subscribe的filter是.\..*,那么相當於你消費了所有的更新數據。
18. 配置
- 【instance.properties配置定義優先級高於canal.properties】
- 修改了服務端配置文件,服務器會自動重啟
19. 關於HA機制的設計
canal server: 為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處於running,其他的處於standby狀態.
canal client: 為了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操作,否則客戶端接收無法保證有序。
20. Canal的工作原理
原理相對比較簡單:
- canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
- mysql master收到dump請求,開始推送binary log給slave(也就是canal)
- canal解析binary log對象(原始為byte流)
21. 鏈接方式(參考:http://www.importnew.com/25189.html)
1. HA配置架構圖
2. 單連
3. 兩個client+兩個instance+1個mysql
當mysql變動時,兩個client都能獲取到變動
4. 一個server+兩個instance+兩個mysql+兩個client
5. instance****的standby配置
Standby:備庫
22. 總結
這里總結了一下Canal的一些點,僅供參考:
- 原理:模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議;mysql master收到dump請求,開始推送binary log給slave(也就是canal);解析binary log對象(原始為byte流)
- 重復消費問題:在消費端解決。
- 采用開源的open-replicator來解析binlog
- canal需要維護EventStore,可以存取在Memory, File, zk
- canal需要維護客戶端的狀態,同一時刻一個instance只能有一個消費端消費
- 數據傳輸格式:protobuff
- 支持binlog format 類型:statement, row, mixed. 多次附加功能只能在row下使用,比如otter
- binlog position可以支持保存在內存,文件,zk中
- instance啟動方式:rpc/http; 內嵌
- 有ACK機制
- 無告警,無監控,這兩個功能都需要對接外部系統
- 方便快速部署。
