數據同步canal客戶端


1、增量訂閱、消費設計

get/ack/rollback協議介紹:

① Message getWithoutAck(int batchSize),允許指定batchSize,一次可以獲取多條,每次返回的對象為Message,包含的內容為:

  a. batch id 唯一標識
  b. entries 具體的數據對象,對應的數據對象格式:EntryProtocol.proto

② void rollback(long batchId),顧命思議,回滾上次的get請求,重新獲取數據。基於get獲取的batchId進行提交,避免誤操作

③ void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除數據。基於get獲取的batchId進行提交,避免誤操作

2、數據對象格式:EntryProtocol.proto

 1 Entry
 2     Header
 3         logfileName [binlog文件名]
 4         logfileOffset [binlog position]
 5         executeTime [binlog里記錄變更發生的時間戳]
 6         schemaName [數據庫實例]
 7         tableName [表名]
 8         eventType [insert/update/delete類型]
 9     entryType     [事務頭BEGIN/事務尾END/數據ROWDATA]
10     storeValue     [byte數據,可展開,對應的類型為RowChange]
11 RowChange
12 isDdl        [是否是ddl變更操作,比如create table/drop table]
13 sql        [具體的ddl sql]
14 rowDatas    [具體insert/update/delete的變更數據,可為多條,1個binlog event事件可對應多條變更,比如批處理]
15 beforeColumns [Column類型的數組]
16 afterColumns [Column類型的數組]
17 
18 
19 Column
20 index        [column序號]
21 sqlType        [jdbc type]
22 name        [column name]
23 isKey        [是否為主鍵]
24 updated        [是否發生過變更]
25 isNull        [值是否為null]
26 value        [具體的內容,注意為文本]

insert只有after columns, delete只有before columns,而update則會有before / after columns數據.

3、client使用例子

3.1 創建Connector

a. 創建SimpleCanalConnector (直連ip,不支持server/client的failover機制)

1 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");

b. 創建ClusterCanalConnector (基於zookeeper獲取canal server ip,支持server/client的failover機制)

1 CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", destination, "", "");

c. 創建ClusterCanalConnector (基於固定canal server的地址,支持固定的server ip的failover機制,不支持client的failover機制

1 CanalConnector connector = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");

如上可見,創建client connector的時候需要指定destination,即對應於一個instance,一個數據庫。所以canal client和數據庫是一一對應的關系。

3.2 get/ack/rollback使用

 1 // 創建鏈接
 2 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canal_ip, 11111), destination, canal_username, canal_password);
 3 
 4 try {
 5 
 6     // 連接canal,獲取數據
 7     connector.connect();
 8     connector.subscribe();
 9     connector.rollback();
10     log.info("數據同步工程啟動成功,開始獲取數據");
11     while (true) {
12         
13         // 獲取指定數量的數據
14         Message message = connector.getWithoutAck(1000);
15 
16         // 數據批號
17         long batchId = message.getId();
18 
19         // 獲取該批次數據的數量
20         int size = message.getEntries().size();
21 
22         // 無數據
23         if (batchId == -1 || size == 0) {
24 
25             // 等待1秒后重新獲取
26             try {
27                 Thread.sleep(1000);
28             } catch (InterruptedException e) {
29                 log.error(e);
30                 Thread.currentThread().interrupt();
31             }
32 
33             // 提交確認
34             connector.ack(batchId);
35 
36             // 數據存在,執行方法
37         } else {
38             try {
39 
40           // 處理數據
41                 HandleData.handleEntry(message.getEntries());
42 
43                 // 提交確認
44                 connector.ack(batchId);
45             } catch (KafkaException e) {
46                 log.error(e);
47 
48                 // 處理失敗, 回滾數據
49                 connector.rollback(batchId);
50             } catch (Exception e1) {
51                 log.error(e1);
52 
53                 // 提交確認
54                 connector.ack(batchId);
55             }
56         }
57     }
58 } catch (Exception e) {
59 
60     log.error(e);
61 } finally {
62 
63     // 斷開連接
64     connector.disconnect();
65 }

 處理數據的方法封裝到HandleData類中,且看handleEntry如何處理

 1 // 獲取日志行
 2 RowChange rowChage = null;
 3 try {
 4     rowChage = RowChange.parseFrom(entry.getStoreValue());
 5 } catch (Exception e) {
 6     log.error(e);
 7 }
 8 
 9 // 獲取執行事件類型
10 EventType eventType = rowChage.getEventType();
11 
12 // 日志打印,數據明細
13 log.info(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry
14         .getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader()
15         .getSchemaName(), entry.getHeader().getTableName(), eventType));
16 
17  // 獲取表名
18  String tableName = entry.getHeader().getTableName();
19 
20 
21 // 遍歷日志行,執行任務
22 for (RowData rowData : rowChage.getRowDatasList()) {
23     Map<String, Object> data;
24 
25     // 刪除操作
26     if (eventType == EventType.DELETE) {
27 
28         // 解析數據
29         data = DataUtils.parseData(tableName, "delete", rowData.getBeforeColumnsList());
30 
31         // 插入操作
32     } else if (eventType == EventType.INSERT) {
33 
34         // 解析數據
35         data = DataUtils.parseData(tableName, "insert", rowData.getAfterColumnsList());
36 
37         // 更新操作
38     } else {
39 
40         // 解析數據
41         data = DataUtils.parseData(tableName, "update", rowData.getAfterColumnsList());
42     }
43 
44     // 數據解析成功
45     if (data != null && data.size() > 0) {
46 
47 
48         // 內容轉接json格式發送
49         JSONObject json = JSONObject.fromObject(data);
50         try {
51             Productor.send("canal_" + tableName = "_topic", json.toString(), tableName + "|" + data.get("canal_kafka_key"));
52         } catch (Exception e) {
53             throw new KafkaException("kafka發送異常:" + e);
54         }
55 
56         log.info("數據成功發送kafka");
57     }
58 }

Entry數據被解析成Map格式數據,然后轉為json字符串,發到kafka。為什么要借用消息中間件kafka呢,不用kafka可以嗎?當然可以,直接寫數據同步的邏輯沒有問題。但是如果一個數據用到多個業務場景,勢必導致一個類中有多套同步邏輯,對於后期的維護很不利,多套業務摻雜在一起勢必會互相影響。合理的做法應該是業務隔離,每套業務都能接受到數據變更的消息,然后做自己需要的同步,這樣就需要在數據接受和數據處理形成1對n的關系。消息中間件的消息接受和消費模型正好可以完成這個功能。

一個canal client的消息分發給多個kafka消費者消費。每個kafka消費者代表一種業務場景,架構清晰、利於維護,同時一個kafka消費者可以消費多個canal client的topic。

上面的解析數據邏輯比較簡單,將list解析成map

 1 Map<String, Object> result = new HashMap<String, Object>();
 2 try {
 3     int index = 0;
 4     for (Column column : columns) {
 5         String value = column.getIsNull() ? null : column.getValue();
 6 
 7         // kafka在消息為10K時吞吐量達到最大
 8         if (value != null && value.length() > 10240) {
 9             value = value.substring(0, 10240);
10         }
11         if (index == 0) {
12             result.put("canal_kafka_key", value);
13         }
14         result.put(column.getName(), value);
15         index++;
16     }
17     result.put("operate_type", "delete"||"insert"||"update");
18 } catch (Exception e) {
19     log.error(e);
20 }
21 if (logStr.lastIndexOf(",") == logStr.length() - 1) {
22     logStr = logStr.substring(0, logStr.length() - 1);
23 }
24 return result;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM