關於使用Binlog和canal來對MySQL的數據寫入進行監控


先說下Binlog和canal是什么吧。

1、Binlog是mysql數據庫的操作日志,當有發生增刪改查操作時,就會在data目錄下生成一個log文件,形如mysql-bin.000001,mysql-bin.000002等格式

2、canal是阿里巴巴旗下的一款開源項目,純Java開發。基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB);

3、canal起源:早期,阿里巴巴B2B公司因為存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變更,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務,從此開啟了一段新紀元。

4、基於日志增量訂閱&消費支持的業務:

(1)數據庫鏡像

(2)數據庫實時備份

(3)多級索引 (賣家和買家各自分庫索引)

(4)search build

(5)業務cache刷新

(6)價格變化等重要業務消息

二、工作原理:

1、mysql主備復制實現:

從上層來看,復制分成三步:

(1)master將改變記錄到二進制日志(binary log)中(這些記錄叫做二進制日志事件,binary log events,可以通過show binlog events進行查看);

(2)slave將master的binary log events拷貝到它的中繼日志(relay log);

(3)slave重做中繼日志中的事件,將改變反映它自己的數據。

2、canal的工作原理:

原理相對比較簡單:

(1)canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議

(2)mysql master收到dump請求,開始推送binary log給slave(也就是canal)

(3)canal解析binary log對象(原始為byte流)

三、主要配置:

1、mysql默認是沒有開啟Binlog,不妨查看下本地mysql是否開啟,可執行:

1 SHOW VARIABLES LIKE 'log_%';

如下圖,是我本地mysql,“on”代表已經開啟,“off”代表關閉

2、如何開啟Binlog:

(1)先進入路徑為C:\ProgramData\MySQL\MySQL Server 5.6的文件夾下(如果沒有,可能是沒有將隱藏的文件夾顯示),而不是這個路徑C:\Program Files\MySQL\MySQL Server 5.6

找到my.ini文件,在打開之前需要先將mysql服務停止,之后在my.ini配置文件中添加以下內容:

 1 #添加這一行就ok  
 2 log-bin=mysql-bin 
 3 #選擇row模式 
 4 binlog-format=ROW  
 5 #配置mysql replaction需要定義,不能和canal的slaveId重復  
 6 server_id=1 
 7 
 8 #指定生成log的數據庫
 9 binlog_do_db=springboot_test1(這是指定需要生成log的數據庫,如果刪除這句,則表示所有數據庫都需要生成log)
10 
11 log-output=FILE
12 general-log=1  (只需要將0更改為1即可)
13 general_log_file="MYUNYU.log"
14 slow-query-log=1
15 slow_query_log_file="MYUNYU-slow.log"
16 long_query_time=10

配置好之后,再啟動mysql服務,執行查看binlog是否開啟,如果還沒開啟,那就可能是配置出了問題或者mysql版本的問題,這里不詳細說

(2)從節點通過一個專門的賬號連接主節點,這個賬號需要擁有全局的 REPLICATION 權限。我們可以使用GRANT 命令創建這樣的賬號(需要先選中mysql系統庫):

1 CREATE USER canal IDENTIFIED BY 'canal';  
2 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
3 FLUSH PRIVILEGES;

再查詢mysql系統庫中的user表是否存在canal用戶:

3、配置canal:

首先先下載canal服務端代碼canal.deployer-1.1.1.tar.gz(https://github.com/alibaba/canal/releases),解壓之后,配置文件在conf文件夾下,

進入路徑為C:\...\canal\canal.deployer-1.1.1\conf\example的文件夾,打開配置文件instance.properties,詳細配置如下:

 1 #################################################
 2 ## mysql serverId , v1.0.26+ will autoGen 
  #slaveId不能與my.ini配置文件的server_id一致
3 canal.instance.mysql.slaveId=1234 4 5 # enable gtid use true/false 6 canal.instance.gtidon=false 7 8 # position info 9 canal.instance.master.address=127.0.0.1:3306 10 canal.instance.master.journal.name= 11 canal.instance.master.position= 12 canal.instance.master.timestamp= 13 canal.instance.master.gtid= 14 15 # rds oss binlog 16 canal.instance.rds.accesskey= 17 canal.instance.rds.secretkey= 18 canal.instance.rds.instanceId= 19 20 # table meta tsdb info 21 canal.instance.tsdb.enable=true 22 #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/springboot_test1 23 #canal.instance.tsdb.dbUsername=canal 24 #canal.instance.tsdb.dbPassword=canal 25 26 #canal.instance.standby.address = 27 #canal.instance.standby.journal.name = 28 #canal.instance.standby.position = 29 #canal.instance.standby.timestamp = 30 #canal.instance.standby.gtid= 31 32 # username/password 33 canal.instance.dbUsername=canal 34 canal.instance.dbPassword=canal 35 canal.instance.connectionCharset = UTF-8
  #這里配置監控的數據庫名 36 canal.instance.defaultDatabaseName =springboot_test1 37 # enable druid Decrypt database password 38 canal.instance.enableDruid=false 39 #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== 40 41 # table regex 42 canal.instance.filter.regex=.*\\..* 43 # table black regex 44 canal.instance.filter.black.regex= 45 #################################################

再進入路徑為C:\...\canal\canal.deployer-1.1.1\bin的文件夾,雙擊打開startup.bat,如果顯示有以下內容,則表示配置成功,canal服務器端啟動ok:

4、客戶端代碼測試:

(1)首先創建一個空的maven項目,在pom文件中引入canal客戶端的依賴:

1     <dependency>
2             <groupId>com.alibaba.otter</groupId>
3             <artifactId>canal.client</artifactId>
4             <version>1.0.12</version>
5         </dependency>

(2)創建一個類進行測試:

 1 package com.xxx;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.util.List;
 5 
 6 import com.alibaba.otter.canal.client.CanalConnector;
 7 import com.alibaba.otter.canal.protocol.Message;
 8 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 9 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
10 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
11 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
12 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
13 import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
14 import com.alibaba.otter.canal.client.*;
15 
16 public class CanalClient {
17 
18     public static void main(String args[]) {
19         // 創建鏈接            instanceA
20         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
21                 11111), "example", "", "");
22         int batchSize = 1000;
23         int emptyCount = 0;
24         try {
25             connector.connect();
26             connector.subscribe(".*\\..*");
27             connector.rollback();
28             int totalEntryCount = 1200;
29             while (emptyCount < totalEntryCount) {
30                 Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
31                 long batchId = message.getId();
32                 int size = message.getEntries().size();
33                 if (batchId == -1 || size == 0) {
34                     emptyCount++;
35                     System.out.println("empty count : " + emptyCount);
36                     try {
37                         Thread.sleep(1000);
38                     } catch (InterruptedException e) {
39                         e.printStackTrace();
40                     }
41                 } else {
42                     emptyCount = 0;
43                     printEntry(message.getEntries());
44                 }
45                 connector.ack(batchId); // 提交確認
46             }
47             System.out.println("empty too many times, exit");
48         }catch (Exception e){
49             //connector.rollback(batchId); // 處理失敗, 回滾數據
50         }
51         finally {
52             connector.disconnect();
53         }
54     }
55 
56     private static void printEntry( List<Entry> entrys) {
57         for (Entry entry : entrys) {
58             if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
59                 continue;
60             }
61             RowChange rowChage = null;
62             try {
63                 rowChage = RowChange.parseFrom(entry.getStoreValue());
64             } catch (Exception e) {
65                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
66             }
67 
68             EventType eventType = rowChage.getEventType();
69             System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
70                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
71                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
72                     eventType));
73             for (RowData rowData : rowChage.getRowDatasList()) {
74                 if (eventType == EventType.DELETE) {
75                     printColumn(rowData.getBeforeColumnsList());
76                 } else if (eventType == EventType.INSERT) {
77                     printColumn(rowData.getAfterColumnsList());
78 
79                 } else {
80                     System.out.println("-------> before");
81                     printColumn(rowData.getBeforeColumnsList());
82                     System.out.println("-------> after");
83                     printColumn(rowData.getAfterColumnsList());
84                 }
85             }
86         }
87     }
88 
89     private static void printColumn( List<Column> columns) {
90         for (Column column : columns) {
91             System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
92         }
93     }
94 
95 }

運行CanalClient的main方法,如果看到控制台出現以下內容則代表連接成功:

再到mysql中創建數據庫springboot_test,springboot_test1,springboot_test2,再在這三個庫中分別創建student表,sql語句為:

CREATE TABLE `student` (
`ID` int(11) NOT NULL AUTO_INCREMENT,
`NAME` varchar(20) NOT NULL,
`CLASS_NAME` varchar(30) NOT NULL,
`CREATE_DATE` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
`UPDATE_DATE` timestamp NOT NULL DEFAULT '0000-00-00 00:00:00' ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (`ID`) USING BTREE
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;

執行插入語句:

1 INSERT INTO student(NAME , class_name )VALUES('student1' , 'class1');

sql執行完之后再看idea中的控制台,如果出現下面內容打印則表示可以監控數據庫的寫入等操作

到此,使用Binlog和canal來對MySQL的數據寫入進行監控的操作實現完畢!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM