先說下Binlog和canal是什么吧。
1、Binlog是mysql數據庫的操作日志,當有發生增刪改查操作時,就會在data目錄下生成一個log文件,形如mysql-bin.000001,mysql-bin.000002等格式
2、canal是阿里巴巴旗下的一款開源項目,純Java開發。基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB);
3、canal起源:早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。
4、基於日志增量訂閱&消費支持的業務:
(1)數據庫鏡像
(2)數據庫實時備份
(3)多級索引 (賣家和買家各自分庫索引)
(4)search build
(5)業務cache刷新
(6)價格變化等重要業務消息
二、工作原理:
1、mysql主備復制實現:
從上層來看,復制分成三步:
(1)master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);
(2)slave將master的binary log events拷貝到它的中繼日志(relay log);
(3)slave重做中繼日志中的事件,將改變反映它自己的數據。
2、canal的工作原理:
原理相對比較簡單:
(1)canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
(2)mysql master收到dump請求,開始推送binary log給slave(也就是canal)
(3)canal解析binary log對象(原始為byte流)
三、主要配置:
1、mysql默認是沒有開啟Binlog,不妨查看下本地mysql是否開啟,可執行:
1 SHOW VARIABLES LIKE 'log_%';
如下圖,是我本地mysql,“on”代表已經開啟,“off”代表關閉
2、如何開啟Binlog:
(1)先進入路徑為C:\ProgramData\MySQL\MySQL Server 5.6的文件夾下(如果沒有,可能是沒有將隱藏的文件夾顯示),而不是這個路徑C:\Program Files\MySQL\MySQL Server 5.6
找到my.ini文件,在打開之前需要先將mysql服務停止,之后在my.ini配置文件中添加以下內容:
1 #添加這一行就ok 2 log-bin=mysql-bin 3 #選擇row模式 4 binlog-format=ROW 5 #配置mysql replaction需要定義,不能和canal的slaveId重復 6 server_id=1 7 8 #指定生成log的數據庫 9 binlog_do_db=springboot_test1(這是指定需要生成log的數據庫,如果刪除這句,則表示所有數據庫都需要生成log) 10 11 log-output=FILE 12 general-log=1 (只需要將0更改為1即可) 13 general_log_file="MYUNYU.log" 14 slow-query-log=1 15 slow_query_log_file="MYUNYU-slow.log" 16 long_query_time=10
配置好之后,再啟動mysql服務,執行查看binlog是否開啟,如果還沒開啟,那就可能是配置出了問題或者mysql版本的問題,這里不詳細說
(2)從節點通過一個專門的賬號連接主節點,這個賬號需要擁有全局的 REPLICATION
權限。我們可以使用GRANT
命令創建這樣的賬號(需要先選中mysql系統庫):
1 CREATE USER canal IDENTIFIED BY 'canal'; 2 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; 3 FLUSH PRIVILEGES;
再查詢mysql系統庫中的user表是否存在canal用戶:
3、配置canal:
首先先下載canal服務端代碼canal.deployer-1.1.1.tar.gz(https://github.com/alibaba/canal/releases),解壓之后,配置文件在conf文件夾下,
進入路徑為C:\...\canal\canal.deployer-1.1.1\conf\example的文件夾,打開配置文件instance.properties,詳細配置如下:
1 ################################################# 2 ## mysql serverId , v1.0.26+ will autoGen
#slaveId不能與my.ini配置文件的server_id一致 3 canal.instance.mysql.slaveId=1234 4 5 # enable gtid use true/false 6 canal.instance.gtidon=false 7 8 # position info 9 canal.instance.master.address=127.0.0.1:3306 10 canal.instance.master.journal.name= 11 canal.instance.master.position= 12 canal.instance.master.timestamp= 13 canal.instance.master.gtid= 14 15 # rds oss binlog 16 canal.instance.rds.accesskey= 17 canal.instance.rds.secretkey= 18 canal.instance.rds.instanceId= 19 20 # table meta tsdb info 21 canal.instance.tsdb.enable=true 22 #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/springboot_test1 23 #canal.instance.tsdb.dbUsername=canal 24 #canal.instance.tsdb.dbPassword=canal 25 26 #canal.instance.standby.address = 27 #canal.instance.standby.journal.name = 28 #canal.instance.standby.position = 29 #canal.instance.standby.timestamp = 30 #canal.instance.standby.gtid= 31 32 # username/password 33 canal.instance.dbUsername=canal 34 canal.instance.dbPassword=canal 35 canal.instance.connectionCharset = UTF-8
#這里配置監控的數據庫名 36 canal.instance.defaultDatabaseName =springboot_test1 37 # enable druid Decrypt database password 38 canal.instance.enableDruid=false 39 #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== 40 41 # table regex 42 canal.instance.filter.regex=.*\\..* 43 # table black regex 44 canal.instance.filter.black.regex= 45 #################################################
再進入路徑為C:\...\canal\canal.deployer-1.1.1\bin的文件夾,雙擊打開startup.bat,如果顯示有以下內容,則表示配置成功,canal服務器端啟動ok:
4、客戶端代碼測試:
(1)首先創建一個空的maven項目,在pom文件中引入canal客戶端的依賴:
1 <dependency> 2 <groupId>com.alibaba.otter</groupId> 3 <artifactId>canal.client</artifactId> 4 <version>1.0.12</version> 5 </dependency>
(2)創建一個類進行測試:
1 package com.xxx; 2 3 import java.net.InetSocketAddress; 4 import java.util.List; 5 6 import com.alibaba.otter.canal.client.CanalConnector; 7 import com.alibaba.otter.canal.protocol.Message; 8 import com.alibaba.otter.canal.protocol.CanalEntry.Column; 9 import com.alibaba.otter.canal.protocol.CanalEntry.Entry; 10 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; 11 import com.alibaba.otter.canal.protocol.CanalEntry.EventType; 12 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; 13 import com.alibaba.otter.canal.protocol.CanalEntry.RowData; 14 import com.alibaba.otter.canal.client.*; 15 16 public class CanalClient { 17 18 public static void main(String args[]) { 19 // 創建鏈接 instanceA 20 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 21 11111), "example", "", ""); 22 int batchSize = 1000; 23 int emptyCount = 0; 24 try { 25 connector.connect(); 26 connector.subscribe(".*\\..*"); 27 connector.rollback(); 28 int totalEntryCount = 1200; 29 while (emptyCount < totalEntryCount) { 30 Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據 31 long batchId = message.getId(); 32 int size = message.getEntries().size(); 33 if (batchId == -1 || size == 0) { 34 emptyCount++; 35 System.out.println("empty count : " + emptyCount); 36 try { 37 Thread.sleep(1000); 38 } catch (InterruptedException e) { 39 e.printStackTrace(); 40 } 41 } else { 42 emptyCount = 0; 43 printEntry(message.getEntries()); 44 } 45 connector.ack(batchId); // 提交確認 46 } 47 System.out.println("empty too many times, exit"); 48 }catch (Exception e){ 49 //connector.rollback(batchId); // 處理失敗, 回滾數據 50 } 51 finally { 52 connector.disconnect(); 53 } 54 } 55 56 private static void printEntry( List<Entry> entrys) { 57 for (Entry entry : entrys) { 58 if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { 59 continue; 60 } 61 RowChange rowChage = null; 62 try { 63 rowChage = RowChange.parseFrom(entry.getStoreValue()); 64 } catch (Exception e) { 65 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); 66 } 67 68 EventType eventType = rowChage.getEventType(); 69 System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", 70 entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), 71 entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), 72 eventType)); 73 for (RowData rowData : rowChage.getRowDatasList()) { 74 if (eventType == EventType.DELETE) { 75 printColumn(rowData.getBeforeColumnsList()); 76 } else if (eventType == EventType.INSERT) { 77 printColumn(rowData.getAfterColumnsList()); 78 79 } else { 80 System.out.println("-------> before"); 81 printColumn(rowData.getBeforeColumnsList()); 82 System.out.println("-------> after"); 83 printColumn(rowData.getAfterColumnsList()); 84 } 85 } 86 } 87 } 88 89 private static void printColumn( List<Column> columns) { 90 for (Column column : columns) { 91 System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); 92 } 93 } 94 95 }
運行CanalClient的main方法,如果看到控制台出現以下內容則代表連接成功:
再到mysql中創建數據庫springboot_test,springboot_test1,springboot_test2,再在這三個庫中分別創建student表,sql語句為:
CREATE TABLE `student` (
`ID` int(11) NOT NULL AUTO_INCREMENT,
`NAME` varchar(20) NOT NULL,
`CLASS_NAME` varchar(30) NOT NULL,
`CREATE_DATE` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`UPDATE_DATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
執行插入語句:
1 INSERT INTO student(NAME , class_name )VALUES('student1' , 'class1');
sql執行完之后再看idea中的控制台,如果出現下面內容打印則表示可以監控數據庫的寫入等操作
到此,使用Binlog和canal來對MySQL的數據寫入進行監控的操作實現完畢!