1、增量訂閱、消費設計
get/ack/rollback協議介紹:
① Message getWithoutAck(int batchSize),允許指定batchSize,一次可以獲取多條,每次返回的對象為Message,包含的內容為:
a. batch id 唯一標識
b. entries 具體的數據對象,對應的數據對象格式:EntryProtocol.proto
② void rollback(long batchId),顧命思議,回滾上次的get請求,重新獲取數據。基於get獲取的batchId進行提交,避免誤操作
③ void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除數據。基於get獲取的batchId進行提交,避免誤操作
2、數據對象格式:EntryProtocol.proto
1 Entry 2 Header 3 logfileName [binlog文件名] 4 logfileOffset [binlog position] 5 executeTime [binlog里記錄變更發生的時間戳] 6 schemaName [數據庫實例] 7 tableName [表名] 8 eventType [insert/update/delete類型] 9 entryType [事務頭BEGIN/事務尾END/數據ROWDATA] 10 storeValue [byte數據,可展開,對應的類型為RowChange] 11 RowChange 12 isDdl [是否是ddl變更操作,比如create table/drop table] 13 sql [具體的ddl sql] 14 rowDatas [具體insert/update/delete的變更數據,可為多條,1個binlog event事件可對應多條變更,比如批處理] 15 beforeColumns [Column類型的數組] 16 afterColumns [Column類型的數組] 17 18 19 Column 20 index [column序號] 21 sqlType [jdbc type] 22 name [column name] 23 isKey [是否為主鍵] 24 updated [是否發生過變更] 25 isNull [值是否為null] 26 value [具體的內容,注意為文本]
insert只有after columns, delete只有before columns,而update則會有before / after columns數據.
3、client使用例子
3.1 創建Connector
a. 創建SimpleCanalConnector (直連ip,不支持server/client的failover機制)
1 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");
b. 創建ClusterCanalConnector (基於zookeeper獲取canal server ip,支持server/client的failover機制)
1 CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", destination, "", "");
c. 創建ClusterCanalConnector (基於固定canal server的地址,支持固定的server ip的failover機制,不支持client的failover機制
1 CanalConnector connector = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");
如上可見,創建client connector的時候需要指定destination,即對應於一個instance,一個數據庫。所以canal client和數據庫是一一對應的關系。
3.2 get/ack/rollback使用
1 // 創建鏈接 2 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canal_ip, 11111), destination, canal_username, canal_password); 3 4 try { 5 6 // 連接canal,獲取數據 7 connector.connect(); 8 connector.subscribe(); 9 connector.rollback(); 10 log.info("數據同步工程啟動成功,開始獲取數據"); 11 while (true) { 12 13 // 獲取指定數量的數據 14 Message message = connector.getWithoutAck(1000); 15 16 // 數據批號 17 long batchId = message.getId(); 18 19 // 獲取該批次數據的數量 20 int size = message.getEntries().size(); 21 22 // 無數據 23 if (batchId == -1 || size == 0) { 24 25 // 等待1秒后重新獲取 26 try { 27 Thread.sleep(1000); 28 } catch (InterruptedException e) { 29 log.error(e); 30 Thread.currentThread().interrupt(); 31 } 32 33 // 提交確認 34 connector.ack(batchId); 35 36 // 數據存在,執行方法 37 } else { 38 try { 39 40 // 處理數據 41 HandleData.handleEntry(message.getEntries()); 42 43 // 提交確認 44 connector.ack(batchId); 45 } catch (KafkaException e) { 46 log.error(e); 47 48 // 處理失敗, 回滾數據 49 connector.rollback(batchId); 50 } catch (Exception e1) { 51 log.error(e1); 52 53 // 提交確認 54 connector.ack(batchId); 55 } 56 } 57 } 58 } catch (Exception e) { 59 60 log.error(e); 61 } finally { 62 63 // 斷開連接 64 connector.disconnect(); 65 }
處理數據的方法封裝到HandleData類中,且看handleEntry如何處理
1 // 獲取日志行 2 RowChange rowChage = null; 3 try { 4 rowChage = RowChange.parseFrom(entry.getStoreValue()); 5 } catch (Exception e) { 6 log.error(e); 7 } 8 9 // 獲取執行事件類型 10 EventType eventType = rowChage.getEventType(); 11 12 // 日志打印,數據明細 13 log.info(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry 14 .getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader() 15 .getSchemaName(), entry.getHeader().getTableName(), eventType)); 16 17 // 獲取表名 18 String tableName = entry.getHeader().getTableName(); 19 20 21 // 遍歷日志行,執行任務 22 for (RowData rowData : rowChage.getRowDatasList()) { 23 Map<String, Object> data; 24 25 // 刪除操作 26 if (eventType == EventType.DELETE) { 27 28 // 解析數據 29 data = DataUtils.parseData(tableName, "delete", rowData.getBeforeColumnsList()); 30 31 // 插入操作 32 } else if (eventType == EventType.INSERT) { 33 34 // 解析數據 35 data = DataUtils.parseData(tableName, "insert", rowData.getAfterColumnsList()); 36 37 // 更新操作 38 } else { 39 40 // 解析數據 41 data = DataUtils.parseData(tableName, "update", rowData.getAfterColumnsList()); 42 } 43 44 // 數據解析成功 45 if (data != null && data.size() > 0) { 46 47 48 // 內容轉接json格式發送 49 JSONObject json = JSONObject.fromObject(data); 50 try { 51 Productor.send("canal_" + tableName = "_topic", json.toString(), tableName + "|" + data.get("canal_kafka_key")); 52 } catch (Exception e) { 53 throw new KafkaException("kafka發送異常:" + e); 54 } 55 56 log.info("數據成功發送kafka"); 57 } 58 }
Entry數據被解析成Map格式數據,然后轉為json字符串,發到kafka。為什么要借用消息中間件kafka呢,不用kafka可以嗎?當然可以,直接寫數據同步的邏輯沒有問題。但是如果一個數據用到多個業務場景,勢必導致一個類中有多套同步邏輯,對於后期的維護很不利,多套業務摻雜在一起勢必會互相影響。合理的做法應該是業務隔離,每套業務都能接受到數據變更的消息,然后做自己需要的同步,這樣就需要在數據接受和數據處理形成1對n的關系。消息中間件的消息接受和消費模型正好可以完成這個功能。
一個canal client的消息分發給多個kafka消費者消費。每個kafka消費者代表一種業務場景,架構清晰、利於維護,同時一個kafka消費者可以消費多個canal client的topic。
上面的解析數據邏輯比較簡單,將list解析成map
1 Map<String, Object> result = new HashMap<String, Object>(); 2 try { 3 int index = 0; 4 for (Column column : columns) { 5 String value = column.getIsNull() ? null : column.getValue(); 6 7 // kafka在消息為10K時吞吐量達到最大 8 if (value != null && value.length() > 10240) { 9 value = value.substring(0, 10240); 10 } 11 if (index == 0) { 12 result.put("canal_kafka_key", value); 13 } 14 result.put(column.getName(), value); 15 index++; 16 } 17 result.put("operate_type", "delete"||"insert"||"update"); 18 } catch (Exception e) { 19 log.error(e); 20 } 21 if (logStr.lastIndexOf(",") == logStr.length() - 1) { 22 logStr = logStr.substring(0, logStr.length() - 1); 23 } 24 return result;