canal的工作原理:
原理相對比較簡單:
canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
mysql master收到dump請求,開始推送binary log給slave(也就是canal)
canal解析binary log對象(原始為byte流)
架構
說明:
server代表一個canal運行實例,對應於一個jvm
instance對應於一個數據隊列 (1個server對應1..n個instance)
instance代表了一個實際運行的數據隊列,包括了EventPaser,EventSink,EventStore等組件。
instance模塊:
eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)
eventSink (Parser和Store鏈接器,進行數據過濾,加工,分發的工作)
eventStore (數據存儲)
metaManager (增量訂閱&消費信息管理器)
簡單點說:
mysql的binlog是多文件存儲,定位一個LogEvent需要通過binlog filename + binlog position,進行定位
mysql的binlog數據格式,按照生成的方式,主要分為:statement-based、row-based、mixed。
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
目前canal支持所有模式的增量訂閱(但配合同步時,因為statement只有sql,沒有數據,無法獲取原始的變更日志,所以一般建議為ROW模式)
EventParser設計
大致過程:
整個parser過程大致可分為幾步:
Connection獲取上一次解析成功的位置 (如果第一次啟動,則獲取初始指定的位置或者是當前數據庫的binlog位點)
Connection建立鏈接,發送BINLOG_DUMP指令
// 0. write command number
// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name
Mysql開始推送Binaly Log
接收到的Binaly Log的通過Binlog parser進行協議解析,補充一些特定信息
// 補充字段名字,字段類型,主鍵信息,unsigned類型處理
傳遞給EventSink模塊進行數據存儲,是一個阻塞操作,直到存儲成功
存儲成功后,定時記錄Binaly Log位置
EventSink設計
說明:
數據過濾:支持通配符的過濾模式,表名,字段內容等
數據路由/分發:解決1:n (1個parser對應多個store的模式)
數據歸並:解決n:1 (多個parser對應1個store)
數據加工:在進入store之前進行額外的處理,比如join
EventStore設計
1. 目前僅實現了Memory內存模式,后續計划增加本地file存儲,mixed混合模式
2. 借鑒了Disruptor的RingBuffer的實現思路
HA(High Available)機制設計
canal的ha分為兩部分,canal server和canal client分別有對應的ha實現
canal server: 為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處於running,其他的處於standby狀態.
canal client: 為了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操作,否則客戶端接收無法保證有序。
整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命周期綁定)。
Canal Server:
大致步驟:
(1)canal server要啟動某個canal instance時都先向zookeeper進行一次嘗試啟動判斷 (實現:創建EPHEMERAL節點,誰創建成功就允許誰啟動,類似一個分布式鎖)
(2)創建zookeeper節點成功后,對應的canal server就啟動對應的canal instance,沒有創建成功的canal instance就會處於standby狀態
(3)一旦zookeeper發現canal server A創建的節點消失后,立即通知其他的canal server再次進行步驟1的操作,重新選出一個canal server啟動instance.
Canal Client:
canal client每次進行connect時,會首先向zookeeper詢問當前是誰啟動了canal instance,然后和其建立鏈接,一旦鏈接不可用,會重新嘗試connect.
Canal Client的方式和canal server方式類似,也是利用zookeeper的搶占EPHEMERAL節點的方式進行控制.
ephemeral 短暫的; 瞬息的
binlog被刪除的問題
aliyun RDS有自己的binlog日志清理策略,這個策略相比於用戶自建mysql會更加激進,默認應該是18小時就會清理binlog並上傳到oss上,可以在頁面上進行調整,或者業務可以通過oss下載更早的binlog
canal中配置ak,sk的地方:
# aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey =
注意點:
相比於普通的mysql配置,多了rds oss binlog所需要的aliyun ak/sk/實例id等相關信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值)
canal.instance.rds.accesskey aliyun賬號的ak信息 (如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值)
canal.instance.rds.secretkey aliyun賬號的sk信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值)
canal.instance.rds.instanceId aliyun rds對應的實例id信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值)
https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart
otter中配置ak,sk的地方:
公有雲API的認證方式:AK/SK 簡介
1 公有雲API的認證方式
一般有一夏幾種認證方式:
Token認證
AK/SK認證
RSA非對稱加密方式
下面主要介紹AK/SK
1 AK/SK
1.1 AK/SK 原理
雲主機需要通過使用Access Key Id / Secret Access Key加密的方法來驗證某個請求的發送者身份。
Access Key Id(AK)用於標示用戶,Secret Access Key(SK)是用戶用於加密認證字符串和雲廠商用來驗證認證字符串的密鑰,其中SK必須保密。 AK/SK原理使用對稱加解密。
1.2 AK/SK使用機制
雲主機接收到用戶的請求后,系統將使用AK對應的相同的SK和同樣的認證機制生成認證字符串,並與用戶請求中包含的認證字符串進行比對。如果認證字符串相同,系統認為用戶擁有指定的操作權限,並執行相關操作;如果認證字符串不同,系統將忽略該操作並返回錯誤碼。
1.3 流程
判斷用戶請求中是否包含Authorization認證字符串。如果包含認證字符串,則執行下一步操作。
基於HTTP請求信息,使用相同的算法,生成Signature字符串。
使用服務器生成的Signature字符串與用戶提供的字符串進行比對,如果內容不一致,則認為認證失敗,拒絕該請求;如果內容一致,則表示認證成功,系統將按照用戶的請求內容進行操作。
原理:
客戶端:
1. 構建http請求(包含 access key);
2. 使用請求內容和 使用secret access key計算的簽名(signature);
3. 發送請求到服務端。
服務端:
1. 根據發送的access key 查找數據庫得到對應的secret-key;
2. 使用同樣的算法將請求內容和 secret-key一起計算簽名(signature),與客戶端步驟2相同;
3. 對比用戶發送的簽名和服務端計算的簽名,兩者相同則認證通過,否則失敗。
Token
Token認證
使用Token認證方式完成認證鑒權時,用戶首先需要獲取token,在調用接口時增加“X-XXX-Token”到業務接口請求消息頭中。
流程
發送請求,獲取IAM的Endpoint及消息體中的區域名稱。
獲取Token。請求響應成功后在響應消息頭中包含的“X-YYY-Token”的值即為Token值。
調用業務接口,在請求消息頭中增加“X-XXX-Token”,取值為2中獲取的Token。
https://blog.csdn.net/makenothing/java/article/details/81158481
2020-05-29 17:06:25.217 ERROR [bootstrap,,,] 12 --- [37.225.101:2181] - [] org.I0Itec.zkclient.ZkEventThread : Error handling event ZkEvent[Data of /otter/canal/destinations/zaf-security-admin/1001/running changed sent to com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor$1@731b42f4] com.alibaba.otter.canal.protocol.exception.CanalClientException: something goes wrong in initRunning method. at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.initRunning(ClientRunningMonitor.java:156) ~[canal.client-1.1.3.jar!/:na] at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor$1.handleDataDeleted(ClientRunningMonitor.java:72) ~[canal.client-1.1.3.jar!/:na] at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:825) ~[zkclient-0.10.jar!/:na] at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:72) ~[zkclient-0.10.jar!/:na] Caused by: com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: end of stream when reading header at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:189) ~[canal.client-1.1.3.jar!/:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.access$000(SimpleCanalConnector.java:50) ~[canal.client-1.1.3.jar!/:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector$1.processActiveEnter(SimpleCanalConnector.java:422) ~[canal.client-1.1.3.jar!/:na] at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.processActiveEnter(ClientRunningMonitor.java:221) ~[canal.client-1.1.3.jar!/:na] at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.initRunning(ClientRunningMonitor.java:123) ~[canal.client-1.1.3.jar!/:na] ... 3 common frames omitted Caused by: java.io.IOException: end of stream when reading header at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.read(SimpleCanalConnector.java:404) ~[canal.client-1.1.3.jar!/:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:392) ~[canal.client-1.1.3.jar!/:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:376) ~[canal.client-1.1.3.jar!/:na] at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:151) ~[canal.client-1.1.3.jar!/:na] ... 7 common frames omitted
問題一:
ERROR c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - dump address /192.168.1.50:3306 has an error, retrying. caused by
com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example
原因:meta.dat 中保存的位點信息和數據庫的位點信息不一致【mysql目前已經沒有保存這個位點了】;導致canal抓取不到數據庫的動作;
解決方案:刪除meta.dat刪除,再重啟canal,問題解決;
集群操作:進入canal對應的zookeeper集群下,刪除節點/otter/canal/destinations/xxxxx/1001/cursor ;重啟canal即可恢復;
問題二:
java.lang.OutOfMemoryError: Java heap space
canal消費端掛了太久,在zk對應conf下節點的
/otter/canal/destinations/test_db/1001/cursor 位點信息是很早以前,導致重啟canal時,從很早以前的位點開始消費,導致canal服務器內存爆掉
監聽數據庫變更,只有TransactionBegin/TransactionEnd,沒有拿到數據的EventType;
原因可能是canal.instance.filter.black.regex=.*\\..*導致,改canal.instance.filter.black.regex=再重啟試試;
問題三:
ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:fdyb_db[com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:`mysql`.`pds_4490277`
Caused by: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:`mysql`.`pds_4490277`
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:`mysql`.`pds_4490277`
Caused by: java.io.IOException: ErrorPacket [errorNumber=1142, fieldCount=-1, message=SELECT command denied to user 'cy_canal'@'11.217.0.224' for table 'pds_4490277', sqlState=42000, sqlStateMarker=#]
with command: desc `mysql`.`pds_4490277`
分析:mysql系統表權限較高,canal讀該表的binlog失敗,位點無法移動
解決:將配置項中黑名單加上mysql下的所有表:canal.instance.filter.black.regex = mysql\\..* ,修改后canal集群不需要重啟即可恢復;
其它注意點:檢查下CanalConnector是否調用subscribe(filter)方法;
有的話,filter需要和instance.properties的canal.instance.filter.regex一致,否則subscribe的filter會覆蓋instance的配置,如果subscribe的filter是.*\\..*,那么相當於你消費了所有的更新數據。
問題四:
現象:數據庫修改后,canal應用感知不到binlog,數據無法正常消費處理;
定位:
1.查看canal服務器,canal應用,zk服務器的日志,確認無異常;
2.查看mysql,es服務器,無異常,
3.查看canal服務器,canal應用配置項,發現canal服務器的canal.properties有問題;
原因:canal.properties中配置了canal.ip和canal.zkServers,如果是zk集群模式下的canal配置了canal.ip,則會優先按IP連接canal服務器,從而讓zk功能失效,位點文件則會保存到本地;
一旦本地位點文件出現問題,各方無錯誤日志,問題就很難排查;
解決:將canal.ip配置項置為空,關掉canal服務器,canal應用,刪除zk上的節點,重啟canal服務器,canal應用,問題解決;
https://blog.csdn.net/my201110lc/article/details/77885720
問題五:
2020-10-26 14:02:52.510 [destination = ddbuat , address = hym-mysql-uat-cluster-private.rwlb.rds.aliyuncs.com/10.10.3.3:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.006381,position=356075532,serverId=14457169,gtid=,timestamp=1603682791000] cost : 9ms , the next step is binlog dump 2020-10-26 14:02:52.520 [MultiStageCoprocessor-Parser-ddbuat-6] ERROR com.alibaba.otter.canal.common.utils.NamedThreadFactory - from MultiStageCoprocessor-Parser-ddbuat-6 com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:db.hymnews,33 vs 25 2020-10-26 14:02:52.523 [destination = ddbuat , address = hym-mysql-uat-cluster-private.rwlb.rds.aliyuncs.com/10.10.3.3:3306 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address hym-mysql-uat-cluster-private.rwlb.rds.aliyuncs.com/10.10.3.3:3306 has an error, retrying. caused by com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:db.hymnews,33 vs 25 2020-10-26 14:02:52.525 [MultiStageCoprocessor-Parser-ddbuat-11] ERROR com.alibaba.otter.canal.common.utils.NamedThreadFactory - from MultiStageCoprocessor-Parser-ddbuat-11 com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:db.hymnews,33 vs 25 2020-10-26 14:02:52.525 [destination = ddbuat , address = hym-mysql-uat-cluster-private.rwlb.rds.aliyuncs.com/10.10.3.3:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:ddbuat[com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed. Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:db.hymnews,33 vs 25
原因:開啟了tsdb,存了db表結構快照
解決辦法:
把存的快照數據刪除,canal會把當前的庫的表結構重新存一份,就不會出現上面的報錯了
delete from meta_snapshot where destination='報錯的destination'
指定的庫:
# table meta tsdb info canal.instance.tsdb.enable=true canal.instance.tsdb.url=jdbc:mysql://rm-xxx.mysql.rds.aliyuncs.com:3306/canal_tsdb?useSSL=false canal.instance.tsdb.dbUsername=admin canal.instance.tsdb.dbPassword=pwd # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
canal在zk上的存儲路徑:
/** * 存儲結構: * * <pre> * /otter * canal * cluster * destinations * dest1 * running (EPHEMERAL) * cluster * client1 * running (EPHEMERAL) * cluster * filter * cursor * mark * 1 * 2 * 3 * </pre> * * @author zebin.xuzb @ 2012-6-21 * @version 1.0.0 */ public class ZookeeperPathUtils { public static final String ZOOKEEPER_SEPARATOR = "/"; public static final String OTTER_ROOT_NODE = ZOOKEEPER_SEPARATOR + "otter"; public static final String CANAL_ROOT_NODE = OTTER_ROOT_NODE + ZOOKEEPER_SEPARATOR + "canal"; public static final String DESTINATION_ROOT_NODE = CANAL_ROOT_NODE + ZOOKEEPER_SEPARATOR + "destinations"; public static final String FILTER_NODE = "filter"; public static final String BATCH_MARK_NODE = "mark"; public static final String PARSE_NODE = "parse"; public static final String CURSOR_NODE = "cursor"; public static final String RUNNING_NODE = "running"; public static final String CLUSTER_NODE = "cluster"; public static final String DESTINATION_NODE = DESTINATION_ROOT_NODE + ZOOKEEPER_SEPARATOR + "{0}"; public static final String DESTINATION_PARSE_NODE = DESTINATION_NODE + ZOOKEEPER_SEPARATOR + PARSE_NODE; public static final String DESTINATION_CLIENTID_NODE = DESTINATION_NODE + ZOOKEEPER_SEPARATOR + "{1}"; public static final String DESTINATION_CURSOR_NODE = DESTINATION_CLIENTID_NODE + ZOOKEEPER_SEPARATOR + CURSOR_NODE; public static final String DESTINATION_CLIENTID_FILTER_NODE = DESTINATION_CLIENTID_NODE + ZOOKEEPER_SEPARATOR + FILTER_NODE; public static final String DESTINATION_CLIENTID_BATCH_MARK_NODE = DESTINATION_CLIENTID_NODE + ZOOKEEPER_SEPARATOR + BATCH_MARK_NODE; public static final String DESTINATION_CLIENTID_BATCH_MARK_WITH_ID_PATH = DESTINATION_CLIENTID_BATCH_MARK_NODE + ZOOKEEPER_SEPARATOR + "{2}"; /** * 服務端當前正在提供服務的running節點 */ public static final String DESTINATION_RUNNING_NODE = DESTINATION_NODE + ZOOKEEPER_SEPARATOR + RUNNING_NODE; /** * 客戶端當前正在工作的running節點 */ public static final String DESTINATION_CLIENTID_RUNNING_NODE = DESTINATION_CLIENTID_NODE + ZOOKEEPER_SEPARATOR + RUNNING_NODE; /** * 整個canal server的集群列表 */ public static final String CANAL_CLUSTER_ROOT_NODE = CANAL_ROOT_NODE + ZOOKEEPER_SEPARATOR + CLUSTER_NODE; public static final String CANAL_CLUSTER_NODE = CANAL_CLUSTER_ROOT_NODE + ZOOKEEPER_SEPARATOR + "{0}"; /** * 針對某個destination的工作的集群列表 */ public static final String DESTINATION_CLUSTER_ROOT = DESTINATION_NODE + ZOOKEEPER_SEPARATOR + CLUSTER_NODE; public static final String DESTINATION_CLUSTER_NODE = DESTINATION_CLUSTER_ROOT + ZOOKEEPER_SEPARATOR + "{1}"; public static String getDestinationPath(String destinationName) { return MessageFormat.format(DESTINATION_NODE, destinationName); } public static String getClientIdNodePath(String destinationName, short clientId) { return MessageFormat.format(DESTINATION_CLIENTID_NODE, destinationName, String.valueOf(clientId)); } public static String getFilterPath(String destinationName, short clientId) { return MessageFormat.format(DESTINATION_CLIENTID_FILTER_NODE, destinationName, String.valueOf(clientId)); } public static String getBatchMarkPath(String destinationName, short clientId) { return MessageFormat.format(DESTINATION_CLIENTID_BATCH_MARK_NODE, destinationName, String.valueOf(clientId)); } public static String getBatchMarkWithIdPath(String destinationName, short clientId, Long batchId) { return MessageFormat.format(DESTINATION_CLIENTID_BATCH_MARK_WITH_ID_PATH, destinationName, String.valueOf(clientId), getBatchMarkNode(batchId)); } public static String getCursorPath(String destination, short clientId) { return MessageFormat.format(DESTINATION_CURSOR_NODE, destination, String.valueOf(clientId)); } public static String getCanalClusterNode(String node) { return MessageFormat.format(CANAL_CLUSTER_NODE, node); } /** * 服務端當前正在提供服務的running節點 */ public static String getDestinationServerRunning(String destination) { return MessageFormat.format(DESTINATION_RUNNING_NODE, destination); } /** * 客戶端當前正在工作的running節點 */ public static String getDestinationClientRunning(String destination, short clientId) { return MessageFormat.format(DESTINATION_CLIENTID_RUNNING_NODE, destination, String.valueOf(clientId)); } public static String getDestinationClusterNode(String destination, String node) { return MessageFormat.format(DESTINATION_CLUSTER_NODE, destination, node); } public static String getDestinationClusterRoot(String destination) { return MessageFormat.format(DESTINATION_CLUSTER_ROOT, destination); } public static String getParsePath(String destination) { return MessageFormat.format(DESTINATION_PARSE_NODE, destination); } /** * 將batchNode轉換為Long */ public static short getClientId(String clientNode) { return Short.valueOf(clientNode); } /** * 將batchNode轉換為Long */ public static long getBatchMarkId(String batchMarkNode) { return Long.valueOf(batchMarkNode); } /** * 將batchId轉化為zookeeper中的node名稱 */ public static String getBatchMarkNode(Long batchId) { return StringUtils.leftPad(String.valueOf(batchId.intValue()), 10, '0'); } }
com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils