1.概要
canal 是阿里發布的一個mysql 同步工具,它是模擬 mysql slave 的方式讀取binlog,並可以將數據寫入到隊列中。
如下圖:是官方提供的架構圖。

2.下載CANAL
下載版本為1.1.5

其中
canal.deployer 是canal服務器
canal.admin 是CANAL可視化管理界面
3.配置canal
3.1 配置mysql
創建用戶並授權
create user 'canal'@'%' identified by 'canal';
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%';
配置mysql
# 打開binlog log-bin=mysql-bin # 選擇ROW(行)模式 binlog-format=ROW # 配置MySQL replaction需要定義,不要和canal的slaveId重復 server_id=1
配置后重啟mysql

查看binlog文件列表

查看當前寫入的log文件

3.2 配置canal

編輯文件 conf/example/instance.properties
canal.instance.gtidon=false # mysql地址 canal.instance.master.address=localhost:3306 # mysql 日志文件 canal.instance.master.journal.name=mysql-bin.000001 # 配置日志起始位置,配置為上圖的 position。 canal.instance.master.position=3970 canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true # 用戶名密碼 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=.*\\..* # table black regex canal.instance.filter.black.regex=mysql\\.slave_.* # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0
配置好后

正常的啟動如下

是否啟動成功,我們可以查看日志數據。

如果是mysql8 可能會報如下錯誤
Canal 1.1.5 啟動報錯:caching_sha2_password Auth failed
這個是mysql 的密碼驗證失敗。
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';
執行這個后,在啟動canal。
3.3 開發java程序讀取同步數據
開發一個springboot程序。
引入jar包。
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.5</version> </dependency>
開發編輯代碼如下:
package com.example.canaldemo; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import org.springframework.beans.factory.InitializingBean; import org.springframework.stereotype.Component; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.exception.CanalClientException; import java.net.InetSocketAddress; import java.util.List; @Component public class CannalClient implements InitializingBean { private final static int BATCH_SIZE = 1000; @Override public void afterPropertiesSet() throws Exception { // 創建鏈接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "example", "", ""); try { //打開連接 connector.connect(); //訂閱數據庫表,全部表 connector.subscribe(".*\\..*"); //回滾到未進行ack的地方,下次fetch的時候,可以從最后一個沒有ack的地方開始拿 connector.rollback(); while (true) { // 獲取指定數量的數據 Message message = connector.getWithoutAck(BATCH_SIZE); //獲取批量ID long batchId = message.getId(); //獲取批量的數量 int size = message.getEntries().size(); //如果沒有數據 if (batchId == -1 || size == 0) { try { //線程休眠2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } else { //如果有數據,處理數據 printEntry(message.getEntries()); } //進行 batch id 的確認。確認之后,小於等於此 batchId 的 Message 都會被確認。 connector.ack(batchId); } } catch (Exception e) { e.printStackTrace(); } finally { connector.disconnect(); } } /** * 打印canal server解析binlog獲得的實體類信息 */ private static void printEntry(List<CanalEntry.Entry> entrys) { for (CanalEntry.Entry entry : entrys) { if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) { //開啟/關閉事務的實體類型,跳過 continue; } //RowChange對象,包含了一行數據變化的所有特征 //比如isDdl 是否是ddl變更操作 sql 具體的ddl sql beforeColumns afterColumns 變更前后的數據字段等等 CanalEntry.RowChange rowChage; try { rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } //獲取操作類型:insert/update/delete類型 CanalEntry.EventType eventType = rowChage.getEventType(); //打印Header信息 System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); //判斷是否是DDL語句 if (rowChage.getIsDdl()) { System.out.println("================》;isDdl: true,sql:" + rowChage.getSql()); } //獲取RowChange對象里的每一行數據,打印出來 for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) { //如果是刪除語句 if (eventType == CanalEntry.EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); //如果是新增語句 } else if (eventType == CanalEntry.EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); //如果是更新的語句 } else { //變更前的數據 System.out.println("------->; before"); printColumn(rowData.getBeforeColumnsList()); //變更后的數據 System.out.println("------->; after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<CanalEntry.Column> columns) { for (CanalEntry.Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
執行后,我們操作數據庫表,比如刪除數據。

這樣我們就可以通過java程序讀取canal讀取的數據。當然我們可以通過代碼將數據插入到其他的數據庫中。
