canal.instance.filter.regex:
mysql 數據解析關注的表,Perl正則表達式.
多個正則之間以逗號(,)分隔,轉義符需要雙斜杠(\\)
常見例子:
1. 所有表:.* or .*\\..*
2. canal schema下所有表: canal\\..*
3. canal下的以canal打頭的表:canal\\.canal.*
4. canal schema下的一張表:canal\\.test1
5. 多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)
canal.instance.filter.black.regex : mysql 數據解析表的黑名單,表達式規則見白名單的規則
https://github.com/alibaba/canal/wiki/AdminGuide
幾點說明:
1. mysql鏈接時的起始位置
canal.instance.master.journal.name + canal.instance.master.position : 精確指定一個binlog位點,進行啟動
canal.instance.master.timestamp : 指定一個時間戳,canal會自動遍歷mysql binlog,找到對應時間戳的binlog位點后,進行啟動
不指定任何信息:默認從當前數據庫的位點,進行啟動。(show master status)
2. mysql解析關注表定義
標准的Perl正則,注意轉義時需要雙斜杠:\\
3. mysql鏈接的編碼
目前canal版本僅支持一個數據庫只有一種編碼,如果一個庫存在多個編碼,需要通過filter.regex配置,將其拆分為多個canal instance,為每個instance指定不同的編碼
在介紹instance配置之前,先了解一下canal如何維護一份增量訂閱&消費的關系信息:
解析位點 (parse模塊會記錄,上一次解析binlog到了什么位置,對應組件為:CanalLogPositionManager)
消費位點 (canal server在接收了客戶端的ack后,就會記錄客戶端提交的最后位點,對應的組件為:CanalMetaManager)

canal-admin模式,canal.serverMode必須配,不然會報NPE
2020-06-23 00:00:14.984 [destination = omssaps , address = mysql-uat-new-mysqlha-readonly.zkh-uat.svc.cluster.local/192.168.228.211:3306 , EventParser] WARN c.a.o.c.p.inbo und.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position 2020-06-23 00:00:15.024 [destination = omssaps , address = mysql-uat-new-mysqlha-readonly.zkh-uat.svc.cluster.local/192.168.228.211:3306 , EventParser] WARN c.a.o.c.p.inbo und.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position {"identity":{"slaveId":-1,"sourceAddress":{"address":"mysql-uat-new-mysqlha-readonly.zkh-uat.svc.cluster.local","port":3306}},"postion":{"gtid":"","included":false,"journa lName":"mysql-uat-new-mysqlha-0-bin.001768","position":345523555,"serverId":100,"timestamp":1585193491000}} 2020-06-23 00:00:15.042 [destination = omssaps , address = mysql-uat-new-mysqlha-readonly.zkh-uat.svc.cluster.local/192.168.228.211:3306 , EventParser] WARN c.a.o.c.p.inbo und.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-uat-new-mysqlha-0-bin.001768,position=345523 555,serverId=100,gtid=,timestamp=1585193491000] cost : 58ms , the next step is binlog dump 2020-06-23 00:00:15.056 [destination = omssaps , address = mysql-uat-new-mysqlha-readonly.zkh-uat.svc.cluster.local/192.168.228.211:3306 , EventParser] ERROR c.a.o.canal.pa rse.inbound.mysql.dbsync.DirectLogFetcher - I/O error while reading from client socket java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) ~[canal.parse-1.1.3.jar:na] at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235) [canal.parse-1.1.3.jar:na] at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:257) [canal.parse-1.1.3.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181] 2020-06-23 00:00:15.056 [destination = omssaps , address = mysql-uat-new-mysqlha-readonly.zkh-uat.svc.cluster.local/192.168.228.211:3306 , EventParser] ERROR c.a.o.c.p.inbo und.mysql.rds.RdsBinlogEventParserProxy - dump address mysql-uat-new-mysqlha-readonly.zkh-uat.svc.cluster.local/192.168.228.211:3306 has an error, retrying. caused by java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log file name in binary log index file at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) ~[canal.parse-1.1.3.jar:na] at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235) ~[canal.parse-1.1.3.jar:na] at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:257) ~[canal.parse-1.1.3.jar:na] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181] 2020-06-23 00:00:15.057 [destination = omssaps , address = mysql-uat-new-mysqlha-readonly.zkh-uat.svc.cluster.local/192.168.228.211:3306 , EventParser] ERROR com.alibaba.ot ter.canal.common.alarm.LogAlarmHandler - destination:omssaps[java.io.IOException: Received error packet: errno = 1236, sqlstate = HY000 errmsg = Could not find first log fi le name in binary log index file at com.alibaba.otter.canal.parse.inbound.mysql.dbsync.DirectLogFetcher.fetch(DirectLogFetcher.java:102) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlConnection.dump(MysqlConnection.java:235) at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:257) at java.lang.Thread.run(Thread.java:748) ]
http://whitesock.iteye.com/blog/1329616
http://www.iteye.com/topic/1129002 七鋒已經fix了canal的,要合到dbsync里面。
http://www.bitscn.com/pdb/mysql/201404/228411.html
http://mechanics.flite.com/blog/2014/04/29/disabling-binlog-checksum-for-mysql-5-dot-5-slash-5-dot-6-master-master-replication/
查詢資料發現mysql版本為5.6時,
這個錯誤一般出現在master5.6,slave在低版本的情況下。這是由於5.6使用了crc32做binlog的checksum;
當一個event被寫入binary log(二進制日志)的時候,checksum也同時寫入binary log,然后在event通過網絡傳輸到從服務器(slave)之后,再在從服務器中對其進行驗證並寫入從服務器的relay log.
由於每一步都記錄了event和checksum,所以我們可以很快地找出問題所在。
在master1中設置binlog_checksum =none;
mysql> show variables like "%sum%";
+---------------------------+--------+
| Variable_name | Value |
+---------------------------+--------+
| binlog_checksum | CRC32 |
| innodb_checksum_algorithm | innodb |
| innodb_checksums | ON |
| master_verify_checksum | OFF |
| slave_sql_verify_checksum | ON |
+---------------------------+--------+
5 rows in set (0.00 sec)
mysql> set global binlog_checksum='NONE'
Query OK, 0 rows affected (0.09 sec)
mysql> show variables like "%sum%";
+---------------------------+--------+
| Variable_name | Value |
+---------------------------+--------+
| binlog_checksum | NONE |
| innodb_checksum_algorithm | innodb |
| innodb_checksums | ON |
| master_verify_checksum | OFF |
| slave_sql_verify_checksum | ON |
+---------------------------+--------+
————————————————
版權聲明:本文為CSDN博主「arkblue」的原創文章,遵循CC 4.0 BY-SA版權協議,轉載請附上原文出處鏈接及本聲明。
原文鏈接:https://blog.csdn.net/arkblue/article/details/44280647
2020-06-23 20:36:04.987 [destination = omssaps , address = mysql.zkh360.com/120.27.222.64:13306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandl er - destination:omssaps[com.google.common.collect.ComputationException: com.alibaba.fastjson.JSONException: deserialize inet adress error at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:889) at com.alibaba.otter.canal.meta.MemoryMetaManager.getCursor(MemoryMetaManager.java:91) at com.alibaba.otter.canal.meta.PeriodMixedMetaManager.getCursor(PeriodMixedMetaManager.java:150) at com.alibaba.otter.canal.parse.index.MetaLogPositionManager.getLatestIndexBy(MetaLogPositionManager.java:57) at com.alibaba.otter.canal.parse.index.FailbackLogPositionManager.getLatestIndexBy(FailbackLogPositionManager.java:68) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.findStartPositionInternal(MysqlEventParser.java:424) at com.alibaba.otter.canal.parse.inbound.mysql.MysqlEventParser.findStartPosition(MysqlEventParser.java:366) at com.alibaba.otter.canal.parse.inbound.AbstractEventParser$3.run(AbstractEventParser.java:186) at java.lang.Thread.run(Thread.java:748) Caused by: com.alibaba.fastjson.JSONException: deserialize inet adress error at com.alibaba.fastjson.serializer.MiscCodec.deserialze(MiscCodec.java:316) at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:642) at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:610) at com.alibaba.fastjson.serializer.MiscCodec.deserialze(MiscCodec.java:189) at com.alibaba.fastjson.parser.deserializer.FastjsonASMDeserializer_3_LogIdentity.deserialze(Unknown Source) at com.alibaba.fastjson.parser.deserializer.JavaBeanDeserializer.deserialze(JavaBeanDeserializer.java:184) at com.alibaba.fastjson.parser.deserializer.FastjsonASMDeserializer_2_LogPosition.deserialze(Unknown Source) at com.alibaba.fastjson.parser.deserializer.JavaBeanDeserializer.deserialze(JavaBeanDeserializer.java:184) at com.alibaba.fastjson.parser.deserializer.JavaBeanDeserializer.deserialze(JavaBeanDeserializer.java:557) at com.alibaba.fastjson.parser.deserializer.JavaBeanDeserializer.deserialze(JavaBeanDeserializer.java:188) at com.alibaba.fastjson.parser.deserializer.JavaBeanDeserializer.deserialze(JavaBeanDeserializer.java:184) at com.alibaba.fastjson.parser.DefaultJSONParser.parseObject(DefaultJSONParser.java:642) at com.alibaba.fastjson.JSON.parseObject(JSON.java:350) at com.alibaba.fastjson.JSON.parseObject(JSON.java:318) at com.alibaba.fastjson.JSON.parseObject(JSON.java:281) at com.alibaba.fastjson.JSON.parseObject(JSON.java:381) at com.alibaba.fastjson.JSON.parseObject(JSON.java:361) at com.alibaba.otter.canal.common.utils.JsonUtils.unmarshalFromByte(JsonUtils.java:36) at com.alibaba.otter.canal.meta.ZooKeeperMetaManager.getCursor(ZooKeeperMetaManager.java:153) at com.alibaba.otter.canal.meta.PeriodMixedMetaManager$3.apply(PeriodMixedMetaManager.java:65) at com.alibaba.otter.canal.meta.PeriodMixedMetaManager$3.apply(PeriodMixedMetaManager.java:62) at com.google.common.collect.ComputingConcurrentHashMap$ComputingValueReference.compute(ComputingConcurrentHashMap.java:356) at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.compute(ComputingConcurrentHashMap.java:182) at com.google.common.collect.ComputingConcurrentHashMap$ComputingSegment.getOrCompute(ComputingConcurrentHashMap.java:151) at com.google.common.collect.ComputingConcurrentHashMap.getOrCompute(ComputingConcurrentHashMap.java:67) at com.google.common.collect.MapMaker$ComputingMapAdapter.get(MapMaker.java:885) ... 8 more Caused by: java.net.UnknownHostException: mysql-uat-new-mysqlha-readonly.zkh-uat.svc.cluster.local: Name or service not known at java.net.Inet4AddressImpl.lookupAllHostAddr(Native Method) at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928) at java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323) at java.net.InetAddress.getAllByName0(InetAddress.java:1276) at java.net.InetAddress.getAllByName(InetAddress.java:1192) at java.net.InetAddress.getAllByName(InetAddress.java:1126) at java.net.InetAddress.getByName(InetAddress.java:1076) at com.alibaba.fastjson.serializer.MiscCodec.deserialze(MiscCodec.java:314) ... 33 more ]
canal和otter共用一個zk,在/otter/canal/destinations/ 節點上會復用的

otter中關於canal在zk上的標記:
com.alibaba.otter.shared.arbitrate.impl.ArbitrateViewServiceImpl
/** * 查詢當前的仲裁器的一些運行狀態視圖 * * @author jianghang 2011-9-27 下午05:27:38 * @version 4.0.0 */ public class ArbitrateViewServiceImpl implements ArbitrateViewService { private static final String CANAL_PATH = "/otter/canal/destinations/%s"; private static final String CANAL_DATA_PATH = CANAL_PATH + "/%s"; private static final String CANAL_CURSOR_PATH = CANAL_PATH + "/%s/cursor"; private ZkClientx zookeeper = ZooKeeperClient.getInstance(); public MainStemEventData mainstemData(Long channelId, Long pipelineId) { String path = ManagePathUtils.getMainStem(channelId, pipelineId); try { byte[] bytes = zookeeper.readData(path); return JsonUtils.unmarshalFromByte(bytes, MainStemEventData.class); } catch (ZkException e) { return null; } } public Long getNextProcessId(Long channelId, Long pipelineId) { String processRoot = ManagePathUtils.getProcessRoot(channelId, pipelineId); IZkConnection connection = zookeeper.getConnection(); // zkclient會將獲取stat信息和正常的操作分開,使用原生的zk進行優化 ZooKeeper orginZk = ((ZooKeeperx) connection).getZookeeper(); Stat processParentStat = new Stat(); // 獲取所有的process列表 try { orginZk.getChildren(processRoot, false, processParentStat); return (Long) ((processParentStat.getCversion() + processParentStat.getNumChildren()) / 2L); } catch (Exception e) { return -1L; } } public List<ProcessStat> listProcesses(Long channelId, Long pipelineId) { List<ProcessStat> processStats = new ArrayList<ProcessStat>(); String processRoot = ManagePathUtils.getProcessRoot(channelId, pipelineId); IZkConnection connection = zookeeper.getConnection(); // zkclient會將獲取stat信息和正常的操作分開,使用原生的zk進行優化 ZooKeeper orginZk = ((ZooKeeperx) connection).getZookeeper(); // 獲取所有的process列表 List<String> processNodes = zookeeper.getChildren(processRoot); List<Long> processIds = new ArrayList<Long>(); for (String processNode : processNodes) { processIds.add(ManagePathUtils.getProcessId(processNode)); } Collections.sort(processIds); for (int i = 0; i < processIds.size(); i++) { Long processId = processIds.get(i); // 當前的process可能會有變化 ProcessStat processStat = new ProcessStat(); processStat.setPipelineId(pipelineId); processStat.setProcessId(processId); List<StageStat> stageStats = new ArrayList<StageStat>(); processStat.setStageStats(stageStats); try { String processPath = ManagePathUtils.getProcess(channelId, pipelineId, processId); Stat zkProcessStat = new Stat(); List<String> stages = orginZk.getChildren(processPath, false, zkProcessStat); Collections.sort(stages, new StageComparator()); StageStat prev = null; for (String stage : stages) {// 循環每個process下的stage String stagePath = processPath + "/" + stage; Stat zkStat = new Stat(); StageStat stageStat = new StageStat(); stageStat.setPipelineId(pipelineId); stageStat.setProcessId(processId); byte[] bytes = orginZk.getData(stagePath, false, zkStat); if (bytes != null && bytes.length > 0) { // 特殊處理zookeeper里的data信息,manager沒有對應node中PipeKey的對象,所以導致反序列化會失敗,需要特殊處理,刪除'@'符號 String json = StringUtils.remove(new String(bytes, "UTF-8"), '@'); EtlEventData data = JsonUtils.unmarshalFromString(json, EtlEventData.class); stageStat.setNumber(data.getNumber()); stageStat.setSize(data.getSize()); Map exts = new HashMap(); if (!CollectionUtils.isEmpty(data.getExts())) { exts.putAll(data.getExts()); } exts.put("currNid", data.getCurrNid()); exts.put("nextNid", data.getNextNid()); exts.put("desc", data.getDesc()); stageStat.setExts(exts); } if (prev != null) {// 對應的start時間為上一個節點的結束時間 stageStat.setStartTime(prev.getEndTime()); } else { stageStat.setStartTime(zkProcessStat.getMtime()); // process的最后修改時間,select // await成功后會設置USED標志位 } stageStat.setEndTime(zkStat.getMtime()); if (ArbitrateConstants.NODE_SELECTED.equals(stage)) { stageStat.setStage(StageType.SELECT); } else if (ArbitrateConstants.NODE_EXTRACTED.equals(stage)) { stageStat.setStage(StageType.EXTRACT); } else if (ArbitrateConstants.NODE_TRANSFORMED.equals(stage)) { stageStat.setStage(StageType.TRANSFORM); // } else if // (ArbitrateConstants.NODE_LOADED.equals(stage)) { // stageStat.setStage(StageType.LOAD); } prev = stageStat; stageStats.add(stageStat); } // 添加一個當前正在處理的 StageStat currentStageStat = new StageStat(); currentStageStat.setPipelineId(pipelineId); currentStageStat.setProcessId(processId); if (prev == null) { byte[] bytes = orginZk.getData(processPath, false, zkProcessStat); if (bytes == null || bytes.length == 0) { continue; // 直接認為未使用,忽略之 } ProcessNodeEventData nodeData = JsonUtils.unmarshalFromByte(bytes, ProcessNodeEventData.class); if (nodeData.getStatus().isUnUsed()) {// process未使用,直接忽略 continue; // 跳過該process } else { currentStageStat.setStage(StageType.SELECT);// select操作 currentStageStat.setStartTime(zkProcessStat.getMtime()); } } else { // 判斷上一個節點,確定當前的stage StageType stage = prev.getStage(); if (stage.isSelect()) { currentStageStat.setStage(StageType.EXTRACT); } else if (stage.isExtract()) { currentStageStat.setStage(StageType.TRANSFORM); } else if (stage.isTransform()) { currentStageStat.setStage(StageType.LOAD); } else if (stage.isLoad()) {// 已經是最后一個節點了 continue; } currentStageStat.setStartTime(prev.getEndTime());// 開始時間為上一個節點的結束時間 } if (currentStageStat.getStage().isLoad()) {// load必須為第一個process節點 if (i == 0) { stageStats.add(currentStageStat); } } else { stageStats.add(currentStageStat);// 其他情況都添加 } } catch (NoNodeException e) { // ignore } catch (KeeperException e) { throw new ArbitrateException(e); } catch (InterruptedException e) { // ignore } catch (UnsupportedEncodingException e) { // ignore } processStats.add(processStat); } return processStats; } public PositionEventData getCanalCursor(String destination, short clientId) { String path = String.format(CANAL_CURSOR_PATH, destination, String.valueOf(clientId)); try { IZkConnection connection = zookeeper.getConnection(); // zkclient會將獲取stat信息和正常的操作分開,使用原生的zk進行優化 ZooKeeper orginZk = ((ZooKeeperx) connection).getZookeeper(); Stat stat = new Stat(); byte[] bytes = orginZk.getData(path, false, stat); PositionEventData eventData = new PositionEventData(); eventData.setCreateTime(new Date(stat.getCtime())); eventData.setModifiedTime(new Date(stat.getMtime())); eventData.setPosition(new String(bytes, "UTF-8")); return eventData; } catch (Exception e) { return null; } } public void removeCanalCursor(String destination, short clientId) { String path = String.format(CANAL_CURSOR_PATH, destination, String.valueOf(clientId)); zookeeper.delete(path); } @Override public void removeCanal(String destination, short clientId) { String path = String.format(CANAL_DATA_PATH, destination, String.valueOf(clientId)); zookeeper.deleteRecursive(path); } public void removeCanal(String destination) { String path = String.format(CANAL_PATH, destination); zookeeper.deleteRecursive(path); } }
cannal中在zk上的標識:
/** * 存儲結構: * * <pre> * /otter * canal * cluster * destinations * dest1 * running (EPHEMERAL) * cluster * client1 * running (EPHEMERAL) * cluster * filter * cursor * mark * 1 * 2 * 3 * </pre> * * @author zebin.xuzb @ 2012-6-21 * @version 1.0.0 */ public class ZookeeperPathUtils { public static final String ZOOKEEPER_SEPARATOR = "/"; public static final String OTTER_ROOT_NODE = ZOOKEEPER_SEPARATOR + "otter"; public static final String CANAL_ROOT_NODE = OTTER_ROOT_NODE + ZOOKEEPER_SEPARATOR + "canal"; public static final String DESTINATION_ROOT_NODE = CANAL_ROOT_NODE + ZOOKEEPER_SEPARATOR + "destinations"; public static final String FILTER_NODE = "filter"; public static final String BATCH_MARK_NODE = "mark"; public static final String PARSE_NODE = "parse"; public static final String CURSOR_NODE = "cursor"; public static final String RUNNING_NODE = "running"; public static final String CLUSTER_NODE = "cluster"; public static final String DESTINATION_NODE = DESTINATION_ROOT_NODE + ZOOKEEPER_SEPARATOR + "{0}"; public static final String DESTINATION_PARSE_NODE = DESTINATION_NODE + ZOOKEEPER_SEPARATOR + PARSE_NODE; public static final String DESTINATION_CLIENTID_NODE = DESTINATION_NODE + ZOOKEEPER_SEPARATOR + "{1}"; public static final String DESTINATION_CURSOR_NODE = DESTINATION_CLIENTID_NODE + ZOOKEEPER_SEPARATOR + CURSOR_NODE; public static final String DESTINATION_CLIENTID_FILTER_NODE = DESTINATION_CLIENTID_NODE + ZOOKEEPER_SEPARATOR + FILTER_NODE; public static final String DESTINATION_CLIENTID_BATCH_MARK_NODE = DESTINATION_CLIENTID_NODE + ZOOKEEPER_SEPARATOR + BATCH_MARK_NODE; public static final String DESTINATION_CLIENTID_BATCH_MARK_WITH_ID_PATH = DESTINATION_CLIENTID_BATCH_MARK_NODE + ZOOKEEPER_SEPARATOR + "{2}"; /** * 服務端當前正在提供服務的running節點 */ public static final String DESTINATION_RUNNING_NODE = DESTINATION_NODE + ZOOKEEPER_SEPARATOR + RUNNING_NODE; /** * 客戶端當前正在工作的running節點 */ public static final String DESTINATION_CLIENTID_RUNNING_NODE = DESTINATION_CLIENTID_NODE + ZOOKEEPER_SEPARATOR + RUNNING_NODE; /** * 整個canal server的集群列表 */ public static final String CANAL_CLUSTER_ROOT_NODE = CANAL_ROOT_NODE + ZOOKEEPER_SEPARATOR + CLUSTER_NODE; public static final String CANAL_CLUSTER_NODE = CANAL_CLUSTER_ROOT_NODE + ZOOKEEPER_SEPARATOR + "{0}"; /** * 針對某個destination的工作的集群列表 */ public static final String DESTINATION_CLUSTER_ROOT = DESTINATION_NODE + ZOOKEEPER_SEPARATOR + CLUSTER_NODE; public static final String DESTINATION_CLUSTER_NODE = DESTINATION_CLUSTER_ROOT + ZOOKEEPER_SEPARATOR + "{1}"; public static String getDestinationPath(String destinationName) { return MessageFormat.format(DESTINATION_NODE, destinationName); } public static String getClientIdNodePath(String destinationName, short clientId) { return MessageFormat.format(DESTINATION_CLIENTID_NODE, destinationName, String.valueOf(clientId)); } public static String getFilterPath(String destinationName, short clientId) { return MessageFormat.format(DESTINATION_CLIENTID_FILTER_NODE, destinationName, String.valueOf(clientId)); } public static String getBatchMarkPath(String destinationName, short clientId) { return MessageFormat.format(DESTINATION_CLIENTID_BATCH_MARK_NODE, destinationName, String.valueOf(clientId)); } public static String getBatchMarkWithIdPath(String destinationName, short clientId, Long batchId) { return MessageFormat.format(DESTINATION_CLIENTID_BATCH_MARK_WITH_ID_PATH, destinationName, String.valueOf(clientId), getBatchMarkNode(batchId)); } public static String getCursorPath(String destination, short clientId) { return MessageFormat.format(DESTINATION_CURSOR_NODE, destination, String.valueOf(clientId)); } public static String getCanalClusterNode(String node) { return MessageFormat.format(CANAL_CLUSTER_NODE, node); } /** * 服務端當前正在提供服務的running節點 */ public static String getDestinationServerRunning(String destination) { return MessageFormat.format(DESTINATION_RUNNING_NODE, destination); } /** * 客戶端當前正在工作的running節點 */ public static String getDestinationClientRunning(String destination, short clientId) { return MessageFormat.format(DESTINATION_CLIENTID_RUNNING_NODE, destination, String.valueOf(clientId)); } public static String getDestinationClusterNode(String destination, String node) { return MessageFormat.format(DESTINATION_CLUSTER_NODE, destination, node); } public static String getDestinationClusterRoot(String destination) { return MessageFormat.format(DESTINATION_CLUSTER_ROOT, destination); } public static String getParsePath(String destination) { return MessageFormat.format(DESTINATION_PARSE_NODE, destination); } /** * 將batchNode轉換為Long */ public static short getClientId(String clientNode) { return Short.valueOf(clientNode); } /** * 將batchNode轉換為Long */ public static long getBatchMarkId(String batchMarkNode) { return Long.valueOf(batchMarkNode); } /** * 將batchId轉化為zookeeper中的node名稱 */ public static String getBatchMarkNode(Long batchId) { return StringUtils.leftPad(String.valueOf(batchId.intValue()), 10, '0'); } }
com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils
node啟動時連接manager的重試操作:
/** * 通訊交互的client的默認實現實現 * * @author jianghang */ public class DefaultCommunicationClientImpl implements CommunicationClient { private static final Logger logger = LoggerFactory.getLogger(DefaultCommunicationClientImpl.class); private CommunicationConnectionFactory factory = null; private int poolSize = 10; private ExecutorService executor = null; private int retry = 3; private int retryDelay = 1000; private boolean discard = false; public DefaultCommunicationClientImpl(){ } public DefaultCommunicationClientImpl(CommunicationConnectionFactory factory){ this.factory = factory; } public void initial() { RejectedExecutionHandler handler = null; if (discard) { handler = new ThreadPoolExecutor.DiscardPolicy(); } else { handler = new ThreadPoolExecutor.AbortPolicy(); } executor = new ThreadPoolExecutor(poolSize, poolSize, 60 * 1000L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(10 * 1000), new NamedThreadFactory("communication-async"), handler); } public void destory() { executor.shutdown(); } public Object call(final String addr, final Event event) { Assert.notNull(this.factory, "No factory specified"); CommunicationParam params = buildParams(addr); CommunicationConnection connection = null; int count = 0; Throwable ex = null; while (count++ < retry) { try { connection = factory.createConnection(params); return connection.call(event); } catch (Exception e) { logger.error(String.format("call[%s] , retry[%s]", addr, count), e); try { Thread.sleep(count * retryDelay); } catch (InterruptedException e1) { // ignore } ex = e; } finally { if (connection != null) { connection.close(); } } } logger.error("call[{}] failed , event[{}]!", addr, event.toString()); throw new CommunicationException("call[" + addr + "] , Event[" + event.toString() + "]", ex); } public void call(final String addr, final Event event, final Callback callback) { Assert.notNull(this.factory, "No factory specified"); submit(new Runnable() { @Override public void run() { Object obj = call(addr, event); callback.call(obj); } }); } public Object call(final String[] addrs, final Event event) { Assert.notNull(this.factory, "No factory specified"); if (addrs == null || addrs.length == 0) { throw new IllegalArgumentException("addrs example: 127.0.0.1:1099"); } ExecutorCompletionService completionService = new ExecutorCompletionService(executor); List<Future<Object>> futures = new ArrayList<Future<Object>>(addrs.length); List result = new ArrayList(10); for (final String addr : addrs) { futures.add(completionService.submit((new Callable<Object>() { @Override public Object call() throws Exception { return DefaultCommunicationClientImpl.this.call(addr, event); } }))); } Exception ex = null; int errorIndex = 0; while (errorIndex < futures.size()) { try { Future future = completionService.take();// 它也可能被打斷 future.get(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); ex = e; break; } catch (ExecutionException e) { ex = e; break; } errorIndex++; } if (errorIndex < futures.size()) { for (int index = 0; index < futures.size(); index++) { Future<Object> future = futures.get(index); if (future.isDone() == false) { future.cancel(true); } } } else { for (int index = 0; index < futures.size(); index++) { Future<Object> future = futures.get(index); try { result.add(future.get()); } catch (InterruptedException e) { // ignore Thread.currentThread().interrupt(); } catch (ExecutionException e) { // ignore } } } if (ex != null) { throw new CommunicationException(String.format("call addr[%s] error by %s", addrs[errorIndex], ex.getMessage()), ex); } else { return result; } } public void call(final String[] addrs, final Event event, final Callback callback) { Assert.notNull(this.factory, "No factory specified"); if (addrs == null || addrs.length == 0) { throw new IllegalArgumentException("addrs example: 127.0.0.1:1099"); } submit(new Runnable() { @Override public void run() { Object obj = call(addrs, event); callback.call(obj); } }); } /** * 直接提交一個異步任務 */ public Future submit(Runnable call) { Assert.notNull(this.factory, "No factory specified"); return executor.submit(call); } /** * 直接提交一個異步任務 */ public Future submit(Callable call) { Assert.notNull(this.factory, "No factory specified"); return executor.submit(call); } // ===================== helper method ================== private CommunicationParam buildParams(String addr) { CommunicationParam params = new CommunicationParam(); String[] strs = StringUtils.split(addr, ":"); if (strs == null || strs.length != 2) { throw new IllegalArgumentException("addr example: 127.0.0.1:1099"); } InetAddress address = null; try { address = InetAddress.getByName(strs[0]); } catch (UnknownHostException e) { throw new CommunicationException("addr_error", "addr[" + addr + "] is unknow!"); } params.setIp(address.getHostAddress()); params.setPort(Integer.valueOf(strs[1])); return params; } }
com.alibaba.otter.shared.communication.core.impl.DefaultCommunicationClientImpl
2020-08-04 17:48:35.353 [main] ERROR c.a.o.s.c.core.impl.DefaultCommunicationClientImpl - call[otter-manager.pro.svc.cluster.local:1099] , retry[2]
com.alibaba.dubbo.rpc.RpcException: Failed to invoke remote method: acceptEvent, provider: dubbo://172.25.3.187:1099/endpoint?acceptEvent.timeout=50000&client=netty&codec=dubbo&connections=30&iothreads=4&lazy=true&pa
yload=8388608&serialization=java&threads=50, cause: client(url: dubbo://172.25.3.187:1099/endpoint?acceptEvent.timeout=50000&client=netty&codec=dubbo&connections=30&heartbeat=60000&iothreads=4&lazy=true&payload=83886
08&send.reconnect=true&serialization=java&threads=50) failed to connect to server /172.25.3.187:1099, error message is:Connection refused
at com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(DubboInvoker.java:101) ~[dubbo-2.5.3.jar:2.5.3]
at com.alibaba.dubbo.rpc.protocol.AbstractInvoker.invoke(AbstractInvoker.java:144) ~[dubbo-2.5.3.jar:2.5.3]
at com.alibaba.dubbo.rpc.proxy.InvokerInvocationHandler.invoke(InvokerInvocationHandler.java:52) ~[dubbo-2.5.3.jar:2.5.3]
at com.alibaba.dubbo.common.bytecode.proxy0.acceptEvent(proxy0.java) ~[na:2.5.3]

last position【運行時位點存在哪?】
①destinations,節點讀取目錄,在conf目錄下創建一個目錄,這邊創建的節點名字為vads0
②默認配置是file-instance.xml 這個就是各種信息會使用文件的形式記錄,我選擇使用這邊寫的default-instance.xml ,因為我不想去看文件。default有一行配置將游標記錄在ZK服務上面。
區別就是cursor文件有沒有在ZK上面記錄。


https://blog.csdn.net/u012891996/article/details/83061381
21. 鏈接方式(參考:http://www.importnew.com/25189.html)
1. HA配置架構圖
2. 單連

3. 兩個client+兩個instance+1個mysql
當mysql變動時,兩個client都能獲取到變動
4. 一個server+兩個instance+兩個mysql+兩個client

5. instance****的standby配置

Standby:備庫
22. 總結
這里總結了一下Canal的一些點,僅供參考:
- 原理:模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議;mysql master收到dump請求,開始推送binary log給slave(也就是canal);解析binary log對象(原始為byte流)
- 重復消費問題:在消費端解決。
- 采用開源的open-replicator來解析binlog
- canal需要維護EventStore,可以存取在Memory, File, zk
- canal需要維護客戶端的狀態,同一時刻一個instance只能有一個消費端消費
- 數據傳輸格式:protobuff
- 支持binlog format 類型:statement, row, mixed. 多次附加功能只能在row下使用,比如otter
- binlog position可以支持保存在內存,文件,zk中
- instance啟動方式:rpc/http; 內嵌
- 有ACK機制
- 無告警,無監控,這兩個功能都需要對接外部系統
- 方便快速部署。
23. 我調試成功的代碼地址
https://gitee.com/zhiqishao/canal-client
https://www.cnblogs.com/shaozhiqi/p/11534658.html
