首先要Canal服務端下載:鏈接: https://pan.baidu.com/s/1FwEnqPC1mwNXKRwJuMiLdg 密碼: r8xf
連接數據庫的時候需要給予連接數據庫權限:在my.ini配置文件里加上 log-bin=mysql-bin 這個就行了
連接數據庫的賬號需要授權
CREATE USER cqlpz IDENTIFIED BY 'cqlpz';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cqlpz'@'%';
FLUSH PRIVILEGES; 創建一個賬號並給予權限
解壓出來有4個目錄
2.conf 里面canal.properties 配置canal自己的 比如多個服務連接,canal緩存名稱
這是example里面的內容;
兩個properties就是配置自己的數據庫連接,meta.dat相當於啟動canal服務過后的額緩存信息
3.lib 就是canal需要的jar
4.logs 日志文件
maven引入jar
1 <dependency> 2 <groupId>com.alibaba.otter</groupId> 3 <artifactId>canal.client</artifactId> 4 <version>1.0.25</version> 5 </dependency> 6 <dependency> 7 <groupId>redis.clients</groupId> 8 <artifactId>jedis</artifactId> 9 <version>2.9.0</version> 10 </dependency>
java代碼
1.連接canal服務
// 創建鏈接
CanalConnector connector = CanalConnectors.newSingleConnector(
new InetSocketAddress(AddressUtils.getHostIp(), 端口), "canal名稱(虛擬數據名)", "數據庫賬號", "數據庫密碼");
2.連接某個數據庫
connector.connect();
connector.subscribe("監控的數據庫\\..*"); connector.rollback();
3.獲取操作的事件
Message message = connector.getWithoutAck(數量); // 獲取指定數量的數據
List<Entry> entrys=message.getEntries();
for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN ||entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { System.out.println("獲取數據失敗:"+e.getMessage()); } EventType eventType = rowChage.getEventType(); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType.equals(EventType.DELETE)) { rowData.getBeforeColumnsList();//刪除的所有數據 } else if (eventType.equals(EventType.INSERT)) { rowData.getAfterColumnsList();//添加的所有數據 }else if(eventType.equals(EventType.UPDATE)) { rowData.getAfterColumnsList();//修改的所有數據 } }
完整的java類
public class ClientSample { private static Logger logger = LoggerFactory.getLogger(ClientSample.class); public void startCanalThread(){ Thread thread = new StartCanalThread(); thread.start(); } /** * canal 線程 */ public class StartCanalThread extends Thread { @Override public void run() { // 創建鏈接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("localhost", 11111), "longpizi", "longpizi", "cqlongpizi"); connector.connect(); connector.subscribe("test\\..*"); connector.rollback(); try { while (true) { // 獲取指定數量的數據 Message message = connector.getWithoutAck(1000); long batchId = message.getId(); if (batchId != -1 && message.getEntries().size() > 0) { printEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 Thread.sleep(2000); } }catch (Exception e){ logger.error("Canal線程異常,已終止:"+e.getMessage()); } finally { //中斷Canal連接 connector.disconnect(); } } } /** * 數據庫執行的操作 * @param entrys */ private static void printEntry( List<Entry> entrys) { for (Entry entry : entrys) { //操作事物 忽略 if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null;//執行事件信息 String database=null;//執行的數據庫 String table=null;//執行的表 try { database=entry.getHeader().getSchemaName(); table=entry.getHeader().getTableName(); rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { logger.error("獲取數據失敗:"+e.getMessage()); } //獲取執行的事件 EventType eventType = rowChage.getEventType(); for (RowData rowData : rowChage.getRowDatasList()) { //刪除操作 if (eventType.equals(EventType.DELETE)) { redisDelete(rowData.getBeforeColumnsList(),database,table); } //添加操作 else if (eventType.equals(EventType.INSERT)) { redisInsert(rowData.getAfterColumnsList(),database,table); } //修改操作 else if(eventType.equals(EventType.UPDATE)) { redisUpdate(rowData.getAfterColumnsList(),database,table); } //修改表結構 else if(eventType.equals(EventType.ALTER)){ logger.info("修改表結構"); } } } } /** * 數據庫執行了添加操作 * @param columns * @param database * @param table */ private static void redisInsert( List<Column> columns,String database,String table){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } System.out.println("數據庫:"+database+"==>表:"+table+"==>添加數據:"+JSON.toJSONString(json)); } /** * 數據庫執行了修改操作 * @param columns * @param database * @param table */ private static void redisUpdate( List<Column> columns,String database,String table){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } System.out.println("數據庫:"+database+"==>表:"+table+"==>修改數據:"+JSON.toJSONString(json)); } /** * 數據庫執行了刪除操作 * @param columns * @param database * @param table */ private static void redisDelete( List<Column> columns,String database,String table){ JSONObject json=new JSONObject(); for (Column column : columns) { json.put(column.getName(), column.getValue()); } System.out.println("數據庫:"+database+"==>表:"+table+"==>刪除數據:"+JSON.toJSONString(json)); } }
這樣就可把操作的數據更新到redis里面了