1.canal數據格式:
Entry Header logfileName [binlog文件名] logfileOffset [binlog position] executeTime [發生的變更] schemaName tableName eventType [insert/update/delete類型] entryType [事務頭BEGIN/事務尾END/數據ROWDATA] storeValue [byte數據,可展開,對應的類型為RowChange] RowChange isDdl [是否是ddl變更操作,比如create table/drop table] sql [具體的ddl sql] rowDatas [具體insert/update/delete的變更數據,可為多條,1個binlog event事件可對應多條變更,比如批處理] beforeColumns [Column類型的數組] afterColumns [Column類型的數組] Column index sqlType [jdbc type] name [column name] isKey [是否為主鍵] updated [是否發生過變更] isNull [值是否為null] value [具體的內容,注意為文本]
2.java程序測試
package com.cfang.prebo; import java.net.InetSocketAddress; import java.util.List; import java.util.stream.Collectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; import com.alibaba.otter.canal.protocol.Message; public class CanalTest { public static void main(String[] args) throws Exception { CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "cfang", "", ""); connector.connect(); connector.subscribe(".*\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(100); // 獲取指定數量的數據 long batchId = message.getId(); if (batchId == -1 || message.getEntries().isEmpty()) { Thread.sleep(1000); continue; } // System.out.println(message.getEntries()); printEntries(message.getEntries()); connector.ack(batchId);// 提交確認,消費成功,通知server刪除數據 // connector.rollback(batchId);// 處理失敗, 回滾數據,后續重新獲取數據 } } private static void printEntries(List<Entry> entries) throws Exception { for (Entry entry : entries) { if (entry.getEntryType() != EntryType.ROWDATA) { continue; } RowChange rowChange = RowChange.parseFrom(entry.getStoreValue()); EventType eventType = rowChange.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChange.getRowDatasList()) { switch (rowChange.getEventType()) { case INSERT: System.out.println("INSERT "); printColumns(rowData.getAfterColumnsList()); break; case UPDATE: System.out.println("UPDATE "); printColumns(rowData.getAfterColumnsList()); break; case DELETE: System.out.println("DELETE "); printColumns(rowData.getBeforeColumnsList()); break; default: break; } } } } private static void printColumns(List<Column> columns) { for(Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>