Canal學習筆記


canal的工作原理:

原理相對比較簡單:

canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
mysql master收到dump請求,開始推送binary log給slave(也就是canal)
canal解析binary log對象(原始為byte流)

架構

說明:

server代表一個canal運行實例,對應於一個jvm
instance對應於一個數據隊列 (1個server對應1..n個instance)
instance代表了一個實際運行的數據隊列,包括了EventPaser,EventSink,EventStore等組件。

instance模塊:

eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)
eventSink (Parser和Store鏈接器,進行數據過濾,加工,分發的工作)
eventStore (數據存儲)
metaManager (增量訂閱&消費信息管理器)

簡單點說:

mysql的binlog是多文件存儲,定位一個LogEvent需要通過binlog filename + binlog position,進行定位
mysql的binlog數據格式,按照生成的方式,主要分為:statement-based、row-based、mixed。
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.00 sec)
目前canal支持所有模式的增量訂閱(但配合同步時,因為statement只有sql,沒有數據,無法獲取原始的變更日志,所以一般建議為ROW模式)

EventParser設計
大致過程:
整個parser過程大致可分為幾步:

Connection獲取上一次解析成功的位置 (如果第一次啟動,則獲取初始指定的位置或者是當前數據庫的binlog位點)
Connection建立鏈接,發送BINLOG_DUMP指令
// 0. write command number
// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name
Mysql開始推送Binaly Log
接收到的Binaly Log的通過Binlog parser進行協議解析,補充一些特定信息
// 補充字段名字,字段類型,主鍵信息,unsigned類型處理
傳遞給EventSink模塊進行數據存儲,是一個阻塞操作,直到存儲成功
存儲成功后,定時記錄Binaly Log位置

 

EventSink設計
說明:
數據過濾:支持通配符的過濾模式,表名,字段內容等
數據路由/分發:解決1:n (1個parser對應多個store的模式)
數據歸並:解決n:1 (多個parser對應1個store)
數據加工:在進入store之前進行額外的處理,比如join

EventStore設計
1. 目前僅實現了Memory內存模式,后續計划增加本地file存儲,mixed混合模式
2. 借鑒了Disruptor的RingBuffer的實現思路

 

HA(High Available)機制設計
canal的ha分為兩部分,canal server和canal client分別有對應的ha實現

canal server: 為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處於running,其他的處於standby狀態.
canal client: 為了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操作,否則客戶端接收無法保證有序。

整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命周期綁定)。

Canal Server:

大致步驟:
(1)canal server要啟動某個canal instance時都先向zookeeper進行一次嘗試啟動判斷 (實現:創建EPHEMERAL節點,誰創建成功就允許誰啟動,類似一個分布式鎖)
(2)創建zookeeper節點成功后,對應的canal server就啟動對應的canal instance,沒有創建成功的canal instance就會處於standby狀態
(3)一旦zookeeper發現canal server A創建的節點消失后,立即通知其他的canal server再次進行步驟1的操作,重新選出一個canal server啟動instance.

Canal Client:
canal client每次進行connect時,會首先向zookeeper詢問當前是誰啟動了canal instance,然后和其建立鏈接,一旦鏈接不可用,會重新嘗試connect.
Canal Client的方式和canal server方式類似,也是利用zookeeper的搶占EPHEMERAL節點的方式進行控制.


ephemeral 短暫的; 瞬息的

https://github.com/alibaba/canal/wiki/%E7%AE%80%E4%BB%8B#canal%E7%9A%84%E5%B7%A5%E4%BD%9C%E5%8E%9F%E7%90%86

binlog被刪除的問題
aliyun RDS有自己的binlog日志清理策略,這個策略相比於用戶自建mysql會更加激進,默認應該是18小時就會清理binlog並上傳到oss上,可以在頁面上進行調整,或者業務可以通過oss下載更早的binlog

canal中配置ak,sk的地方:

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

注意點:
相比於普通的mysql配置,多了rds oss binlog所需要的aliyun ak/sk/實例id等相關信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值)

 

canal.instance.rds.accesskey aliyun賬號的ak信息 (如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值)
canal.instance.rds.secretkey aliyun賬號的sk信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值)
canal.instance.rds.instanceId aliyun rds對應的實例id信息(如果不需要在本地binlog超過18小時被清理后自動下載oss上的binlog,可以忽略該值)

https://github.com/alibaba/canal/wiki/aliyun-RDS-QuickStart

 

 

otter中配置ak,sk的地方:

 

 

公有雲API的認證方式:AK/SK 簡介

1 公有雲API的認證方式
一般有一夏幾種認證方式:
    Token認證
    AK/SK認證
    RSA非對稱加密方式
    下面主要介紹AK/SK

1 AK/SK
1.1 AK/SK 原理

雲主機需要通過使用Access Key Id / Secret Access Key加密的方法來驗證某個請求的發送者身份。
Access Key Id(AK)用於標示用戶,Secret Access Key(SK)是用戶用於加密認證字符串和雲廠商用來驗證認證字符串的密鑰,其中SK必須保密。 AK/SK原理使用對稱加解密
1.2 AK/SK使用機制

雲主機接收到用戶的請求后,系統將使用AK對應的相同的SK和同樣的認證機制生成認證字符串,並與用戶請求中包含的認證字符串進行比對。如果認證字符串相同,系統認為用戶擁有指定的操作權限,並執行相關操作;如果認證字符串不同,系統將忽略該操作並返回錯誤碼。
1.3 流程
    判斷用戶請求中是否包含Authorization認證字符串。如果包含認證字符串,則執行下一步操作。
    基於HTTP請求信息,使用相同的算法,生成Signature字符串。
    使用服務器生成的Signature字符串與用戶提供的字符串進行比對,如果內容不一致,則認為認證失敗,拒絕該請求;如果內容一致,則表示認證成功,系統將按照用戶的請求內容進行操作。

原理:
  客戶端:
    1. 構建http請求(包含 access key);
    2. 使用請求內容和 使用secret access key計算的簽名(signature);
    3. 發送請求到服務端。
服務端:
    1. 根據發送的access key 查找數據庫得到對應的secret-key;
    2. 使用同樣的算法將請求內容和 secret-key一起計算簽名(signature),與客戶端步驟2相同;
    3. 對比用戶發送的簽名和服務端計算的簽名,兩者相同則認證通過,否則失敗。
Token
Token認證
使用Token認證方式完成認證鑒權時,用戶首先需要獲取token,在調用接口時增加“X-XXX-Token”到業務接口請求消息頭中。
流程
    發送請求,獲取IAM的Endpoint及消息體中的區域名稱。
    獲取Token。請求響應成功后在響應消息頭中包含的“X-YYY-Token”的值即為Token值。
    調用業務接口,在請求消息頭中增加“X-XXX-Token”,取值為2中獲取的Token。

https://blog.csdn.net/makenothing/java/article/details/81158481

 

2020-05-29 17:06:25.217 ERROR [bootstrap,,,] 12 --- [37.225.101:2181] - [] org.I0Itec.zkclient.ZkEventThread        : Error handling event ZkEvent[Data of /otter/canal/destinations/zaf-security-admin/1001/running changed sent to com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor$1@731b42f4]
com.alibaba.otter.canal.protocol.exception.CanalClientException: something goes wrong in initRunning method. 
    at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.initRunning(ClientRunningMonitor.java:156) ~[canal.client-1.1.3.jar!/:na]
    at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor$1.handleDataDeleted(ClientRunningMonitor.java:72) ~[canal.client-1.1.3.jar!/:na]
    at org.I0Itec.zkclient.ZkClient$9.run(ZkClient.java:825) ~[zkclient-0.10.jar!/:na]
    at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:72) ~[zkclient-0.10.jar!/:na]
Caused by: com.alibaba.otter.canal.protocol.exception.CanalClientException: java.io.IOException: end of stream when reading header
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:189) ~[canal.client-1.1.3.jar!/:na]
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.access$000(SimpleCanalConnector.java:50) ~[canal.client-1.1.3.jar!/:na]
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector$1.processActiveEnter(SimpleCanalConnector.java:422) ~[canal.client-1.1.3.jar!/:na]
    at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.processActiveEnter(ClientRunningMonitor.java:221) ~[canal.client-1.1.3.jar!/:na]
    at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.initRunning(ClientRunningMonitor.java:123) ~[canal.client-1.1.3.jar!/:na]
    ... 3 common frames omitted
Caused by: java.io.IOException: end of stream when reading header
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.read(SimpleCanalConnector.java:404) ~[canal.client-1.1.3.jar!/:na]
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:392) ~[canal.client-1.1.3.jar!/:na]
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.readNextPacket(SimpleCanalConnector.java:376) ~[canal.client-1.1.3.jar!/:na]
    at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.doConnect(SimpleCanalConnector.java:151) ~[canal.client-1.1.3.jar!/:na]
    ... 7 common frames omitted

問題一:

ERROR c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - dump address /192.168.1.50:3306 has an error, retrying. caused by

com.alibaba.otter.canal.parse.exception.CanalParseException: can't find start position for example

原因:meta.dat 中保存的位點信息和數據庫的位點信息不一致【mysql目前已經沒有保存這個位點了】;導致canal抓取不到數據庫的動作;

解決方案:刪除meta.dat刪除,再重啟canal,問題解決;

集群操作:進入canal對應的zookeeper集群下,刪除節點/otter/canal/destinations/xxxxx/1001/cursor ;重啟canal即可恢復;

 

問題二:

java.lang.OutOfMemoryError: Java heap space
 

canal消費端掛了太久,在zk對應conf下節點的

/otter/canal/destinations/test_db/1001/cursor 位點信息是很早以前,導致重啟canal時,從很早以前的位點開始消費,導致canal服務器內存爆掉

監聽數據庫變更,只有TransactionBegin/TransactionEnd,沒有拿到數據的EventType;

原因可能是canal.instance.filter.black.regex=.*\\..*導致,改canal.instance.filter.black.regex=再重啟試試;

 

問題三:

ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:fdyb_db[com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:`mysql`.`pds_4490277`
Caused by: com.google.common.collect.ComputationException: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:`mysql`.`pds_4490277`

Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: fetch failed by table meta:`mysql`.`pds_4490277`
Caused by: java.io.IOException: ErrorPacket [errorNumber=1142, fieldCount=-1, message=SELECT command denied to user 'cy_canal'@'11.217.0.224' for table 'pds_4490277', sqlState=42000, sqlStateMarker=#]
with command: desc `mysql`.`pds_4490277`

分析:mysql系統表權限較高,canal讀該表的binlog失敗,位點無法移動

解決:將配置項中黑名單加上mysql下的所有表:canal.instance.filter.black.regex = mysql\\..* ,修改后canal集群不需要重啟即可恢復;

其它注意點:檢查下CanalConnector是否調用subscribe(filter)方法;
有的話,filter需要和instance.properties的canal.instance.filter.regex一致,否則subscribe的filter會覆蓋instance的配置,如果subscribe的filter是.*\\..*,那么相當於你消費了所有的更新數據。

 

問題四:

現象:數據庫修改后,canal應用感知不到binlog,數據無法正常消費處理;

定位:
1.查看canal服務器,canal應用,zk服務器的日志,確認無異常;
2.查看mysql,es服務器,無異常,
3.查看canal服務器,canal應用配置項,發現canal服務器的canal.properties有問題;

原因:canal.properties中配置了canal.ip和canal.zkServers,如果是zk集群模式下的canal配置了canal.ip,則會優先按IP連接canal服務器,從而讓zk功能失效,位點文件則會保存到本地
一旦本地位點文件出現問題,各方無錯誤日志,問題就很難排查;

解決:將canal.ip配置項置為空,關掉canal服務器,canal應用,刪除zk上的節點,重啟canal服務器,canal應用,問題解決;
https://blog.csdn.net/my201110lc/article/details/77885720


問題五:

2020-10-26 14:02:52.510 [destination = ddbuat , address = hym-mysql-uat-cluster-private.rwlb.rds.aliyuncs.com/10.10.3.3:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.006381,position=356075532,serverId=14457169,gtid=,timestamp=1603682791000] cost : 9ms , the next step is binlog dump
2020-10-26 14:02:52.520 [MultiStageCoprocessor-Parser-ddbuat-6] ERROR com.alibaba.otter.canal.common.utils.NamedThreadFactory - from MultiStageCoprocessor-Parser-ddbuat-6
com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:db.hymnews,33 vs 25
2020-10-26 14:02:52.523 [destination = ddbuat , address = hym-mysql-uat-cluster-private.rwlb.rds.aliyuncs.com/10.10.3.3:3306 , EventParser] ERROR c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - dump address hym-mysql-uat-cluster-private.rwlb.rds.aliyuncs.com/10.10.3.3:3306 has an error, retrying. caused by 
com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:db.hymnews,33 vs 25
2020-10-26 14:02:52.525 [MultiStageCoprocessor-Parser-ddbuat-11] ERROR com.alibaba.otter.canal.common.utils.NamedThreadFactory - from MultiStageCoprocessor-Parser-ddbuat-11
com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:db.hymnews,33 vs 25
2020-10-26 14:02:52.525 [destination = ddbuat , address = hym-mysql-uat-cluster-private.rwlb.rds.aliyuncs.com/10.10.3.3:3306 , EventParser] ERROR com.alibaba.otter.canal.common.alarm.LogAlarmHandler - destination:ddbuat[com.alibaba.otter.canal.parse.exception.CanalParseException: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: parse row data failed.
Caused by: com.alibaba.otter.canal.parse.exception.CanalParseException: column size is not match for table:db.hymnews,33 vs 25

原因:開啟了tsdb,存了db表結構快照
解決辦法:
把存的快照數據刪除,canal會把當前的庫的表結構重新存一份,就不會出現上面的報錯了

delete from meta_snapshot where destination='報錯的destination'

指定的庫:

# table meta tsdb info
canal.instance.tsdb.enable=true
canal.instance.tsdb.url=jdbc:mysql://rm-xxx.mysql.rds.aliyuncs.com:3306/canal_tsdb?useSSL=false
canal.instance.tsdb.dbUsername=admin
canal.instance.tsdb.dbPassword=pwd
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

 



canal在zk上的存儲路徑:

/**
 * 存儲結構:
 * 
 * <pre>
 * /otter
 *    canal
 *      cluster
 *      destinations
 *        dest1
 *          running (EPHEMERAL) 
 *          cluster
 *          client1
 *            running (EPHEMERAL)
 *            cluster
 *            filter
 *            cursor
 *            mark
 *              1
 *              2
 *              3
 * </pre>
 * 
 * @author zebin.xuzb @ 2012-6-21
 * @version 1.0.0
 */
public class ZookeeperPathUtils {

    public static final String ZOOKEEPER_SEPARATOR                          = "/";

    public static final String OTTER_ROOT_NODE                              = ZOOKEEPER_SEPARATOR + "otter";

    public static final String CANAL_ROOT_NODE                              = OTTER_ROOT_NODE + ZOOKEEPER_SEPARATOR
                                                                              + "canal";

    public static final String DESTINATION_ROOT_NODE                        = CANAL_ROOT_NODE + ZOOKEEPER_SEPARATOR
                                                                              + "destinations";

    public static final String FILTER_NODE                                  = "filter";

    public static final String BATCH_MARK_NODE                              = "mark";

    public static final String PARSE_NODE                                   = "parse";

    public static final String CURSOR_NODE                                  = "cursor";

    public static final String RUNNING_NODE                                 = "running";

    public static final String CLUSTER_NODE                                 = "cluster";

    public static final String DESTINATION_NODE                             = DESTINATION_ROOT_NODE
                                                                              + ZOOKEEPER_SEPARATOR + "{0}";

    public static final String DESTINATION_PARSE_NODE                       = DESTINATION_NODE + ZOOKEEPER_SEPARATOR
                                                                              + PARSE_NODE;

    public static final String DESTINATION_CLIENTID_NODE                    = DESTINATION_NODE + ZOOKEEPER_SEPARATOR
                                                                              + "{1}";

    public static final String DESTINATION_CURSOR_NODE                      = DESTINATION_CLIENTID_NODE
                                                                              + ZOOKEEPER_SEPARATOR + CURSOR_NODE;

    public static final String DESTINATION_CLIENTID_FILTER_NODE             = DESTINATION_CLIENTID_NODE
                                                                              + ZOOKEEPER_SEPARATOR + FILTER_NODE;

    public static final String DESTINATION_CLIENTID_BATCH_MARK_NODE         = DESTINATION_CLIENTID_NODE
                                                                              + ZOOKEEPER_SEPARATOR + BATCH_MARK_NODE;

    public static final String DESTINATION_CLIENTID_BATCH_MARK_WITH_ID_PATH = DESTINATION_CLIENTID_BATCH_MARK_NODE
                                                                              + ZOOKEEPER_SEPARATOR + "{2}";

    /**
     * 服務端當前正在提供服務的running節點
     */
    public static final String DESTINATION_RUNNING_NODE                     = DESTINATION_NODE + ZOOKEEPER_SEPARATOR
                                                                              + RUNNING_NODE;

    /**
     * 客戶端當前正在工作的running節點
     */
    public static final String DESTINATION_CLIENTID_RUNNING_NODE            = DESTINATION_CLIENTID_NODE
                                                                              + ZOOKEEPER_SEPARATOR + RUNNING_NODE;

    /**
     * 整個canal server的集群列表
     */
    public static final String CANAL_CLUSTER_ROOT_NODE                      = CANAL_ROOT_NODE + ZOOKEEPER_SEPARATOR
                                                                              + CLUSTER_NODE;

    public static final String CANAL_CLUSTER_NODE                           = CANAL_CLUSTER_ROOT_NODE
                                                                              + ZOOKEEPER_SEPARATOR + "{0}";

    /**
     * 針對某個destination的工作的集群列表
     */
    public static final String DESTINATION_CLUSTER_ROOT                     = DESTINATION_NODE + ZOOKEEPER_SEPARATOR
                                                                              + CLUSTER_NODE;
    public static final String DESTINATION_CLUSTER_NODE                     = DESTINATION_CLUSTER_ROOT
                                                                              + ZOOKEEPER_SEPARATOR + "{1}";

    public static String getDestinationPath(String destinationName) {
        return MessageFormat.format(DESTINATION_NODE, destinationName);
    }

    public static String getClientIdNodePath(String destinationName, short clientId) {
        return MessageFormat.format(DESTINATION_CLIENTID_NODE, destinationName, String.valueOf(clientId));
    }

    public static String getFilterPath(String destinationName, short clientId) {
        return MessageFormat.format(DESTINATION_CLIENTID_FILTER_NODE, destinationName, String.valueOf(clientId));
    }

    public static String getBatchMarkPath(String destinationName, short clientId) {
        return MessageFormat.format(DESTINATION_CLIENTID_BATCH_MARK_NODE, destinationName, String.valueOf(clientId));
    }

    public static String getBatchMarkWithIdPath(String destinationName, short clientId, Long batchId) {
        return MessageFormat.format(DESTINATION_CLIENTID_BATCH_MARK_WITH_ID_PATH,
            destinationName,
            String.valueOf(clientId),
            getBatchMarkNode(batchId));
    }

    public static String getCursorPath(String destination, short clientId) {
        return MessageFormat.format(DESTINATION_CURSOR_NODE, destination, String.valueOf(clientId));
    }

    public static String getCanalClusterNode(String node) {
        return MessageFormat.format(CANAL_CLUSTER_NODE, node);
    }

    /**
     * 服務端當前正在提供服務的running節點
     */
    public static String getDestinationServerRunning(String destination) {
        return MessageFormat.format(DESTINATION_RUNNING_NODE, destination);
    }

    /**
     * 客戶端當前正在工作的running節點
     */
    public static String getDestinationClientRunning(String destination, short clientId) {
        return MessageFormat.format(DESTINATION_CLIENTID_RUNNING_NODE, destination, String.valueOf(clientId));
    }

    public static String getDestinationClusterNode(String destination, String node) {
        return MessageFormat.format(DESTINATION_CLUSTER_NODE, destination, node);
    }

    public static String getDestinationClusterRoot(String destination) {
        return MessageFormat.format(DESTINATION_CLUSTER_ROOT, destination);
    }

    public static String getParsePath(String destination) {
        return MessageFormat.format(DESTINATION_PARSE_NODE, destination);
    }

    /**
     * 將batchNode轉換為Long
     */
    public static short getClientId(String clientNode) {
        return Short.valueOf(clientNode);
    }

    /**
     * 將batchNode轉換為Long
     */
    public static long getBatchMarkId(String batchMarkNode) {
        return Long.valueOf(batchMarkNode);
    }

    /**
     * 將batchId轉化為zookeeper中的node名稱
     */
    public static String getBatchMarkNode(Long batchId) {
        return StringUtils.leftPad(String.valueOf(batchId.intValue()), 10, '0');
    }
}

com.alibaba.otter.canal.common.zookeeper.ZookeeperPathUtils

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM