阿里Canal中間件的初步搭建和使用


一、前言

Binlog是MySQL數據庫的二進制日志,用於記錄用戶對數據庫操作的SQL語句(除了數據查詢語句)信息。而Binlog格式也有三種,分別為STATEMENT、ROW、MIXED。STATMENT模式基於SQL語句的復制,每一條會修改數據的SQL語句會記錄。ROW模式除了記錄SQL語句之外,還會記錄每個字段的變化情況,能夠清楚的記錄每行數據的變化歷史,會占用較多的空間。MIXED比較靈活的記錄,當遇到表結構變更的時候,就會記錄為STATMENT模式,當遇到了數據更新或者刪除情況下就會變為ROW模式。Binlog三個用途分別為數據恢復、復制、審計。

Canal是阿里MySQL數據庫Binlog的增量訂閱&消費組件 ,基於數據庫Binlog可以監控數據庫數據的變化,進而用於數據同步等業務。分為Canal Server與Canal Client,前者讀取Binlog解析后存儲,后者連接前者消費。

 

 

二、安裝搭建

1、下載安裝包。並上傳至服務器中。下載地址為:https://github.com/alibaba/canal/releases

 

2、將home文件夾中的壓縮包解壓至安裝路徑(如下圖所示)。

 

1 tar -xzf /home/canal.deployer-1.1.3.tar.gz -C /usr/java/canal

 

3、進入canal文件夾,修改配置文件(如下圖所示)。

1 vi conf/example/instance.properties

 

1 canal.instance.dbUsername=root #數據庫賬號
2 canal.instance.dbPassword=1234 #數據庫密碼
3 canal.instance.defaultDatabaseName = corporate_genealogy #數據庫
4 canal.instance.connectionCharset = UTF-8 #數據庫編碼

 

4、配置MySQL數據庫,開啟Binlog,並選擇模式為ROW(如下圖所示)。

1 vi /etc/my.cnf

 

 

1 #canal
2 log-bin=mysql-bin
3 binlog-format=ROW 
4 server_id=1

 

5、數據庫創建canal用戶,賦予權限,並刷新(如下圖所示)。

ps:這里遇到一個異常信息,是因為數據庫密碼過於簡單,不符合密碼策略,需要修改一下策略。。。

 

 

1 mysql -uroot -p1234
1 SHOW VARIABLES LIKE 'validate_password%';
1 set global validate_password_policy=LOW;
1 set global validate_password_length=4;
1 create user canal identified by 'canal';
1 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
1 FLUSH PRIVILEGES;

 

6、退出並重啟MySQL。

1 exit;
1 sudo service mysqld restart;

 

7、進入canal的bin文件夾,啟動canal-server。

1 ./startup.sh

 

8、查看logs文件中日志是否啟動成功(如下圖所示)。

 

 

 

 

 

三、客戶端代碼檢測

ps:需要注意的是服務器防火牆需打開對應端口號,這里是11111

1、添加Maven依賴

1 <!-- Canal -->
2 <dependency>
3     <groupId>com.alibaba.otter</groupId>
4     <artifactId>canal.client</artifactId>
5     <version>1.1.3</version>
6 </dependency>

 

2、測試類代碼

 1 import java.net.InetSocketAddress;
 2 import java.util.List;
 3 
 4 import com.alibaba.otter.canal.client.CanalConnector;
 5 import com.alibaba.otter.canal.client.CanalConnectors;
 6 import com.alibaba.otter.canal.protocol.CanalEntry.Column;
 7 import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
 8 import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
 9 import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
10 import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
11 import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
12 import com.alibaba.otter.canal.protocol.Message;
13 
14 public class TestCanal {
15 
16     public static void main(String args[]) {
17         // 創建鏈接
18         CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("服務器IP", 11111),
19                 "example", "", "");
20         int batchSize = 1000;
21         int emptyCount = 0;
22         try {
23             connector.connect();
24             connector.subscribe(".*\\..*");
25             connector.rollback();
26             int totalEmtryCount = 1200;
27             while (emptyCount < totalEmtryCount) {
28                 // 獲取指定數量的數據
29                 Message message = connector.getWithoutAck(batchSize);
30                 long batchId = message.getId();
31                 int size = message.getEntries().size();
32                 if (batchId == -1 || size == 0) {
33                     emptyCount++;
34                     try {
35                         Thread.sleep(1000);
36                     } catch (InterruptedException e) {
37                         e.printStackTrace();
38                     }
39                 } else {
40                     emptyCount = 0;
41                     printEntry(message.getEntries());
42                 }
43                 // 提交確認
44                 connector.ack(batchId);
45             }
46             System.out.println("empty too many times, exit");
47         } finally {
48             connector.disconnect();
49         }
50     }
51 
52     private static void printEntry(List<Entry> entrys) {
53         for (Entry entry : entrys) {
54             if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
55                     || entry.getEntryType() == EntryType.TRANSACTIONEND) {
56                 continue;
57             }
58 
59             RowChange rowChage;
60             try {
61                 rowChage = RowChange.parseFrom(entry.getStoreValue());
62             } catch (Exception e) {
63                 throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
64             }
65 
66             EventType eventType = rowChage.getEventType();
67             System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
68                     entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
69                     entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
70 
71             for (RowData rowData : rowChage.getRowDatasList()) {
72                 if (eventType == EventType.DELETE) {
73                     printColumn(rowData.getBeforeColumnsList());
74                 } else if (eventType == EventType.INSERT) {
75                     printColumn(rowData.getAfterColumnsList());
76                 } else {
77                     System.out.println("-------> before");
78                     printColumn(rowData.getBeforeColumnsList());
79                     System.out.println("-------> after");
80                     printColumn(rowData.getAfterColumnsList());
81                 }
82             }
83         }
84     }
85 
86     private static void printColumn(List<Column> columns) {
87         for (Column column : columns) {
88             System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
89         }
90     }
91 
92 }

 

3、Navicat 連接對應數據庫進行一些添加刪除更新操作,控制台輸出如下圖所示。

 

 

 

 

四、總結展望

考慮到Canal的堆積能力並不強。堆積數據到10W+時,速度會變慢,並會出現假死現象。因此介入消息中間件MQ非常有必要,解決堆積能力問題,可以延后消費,能夠方便的得到積壓數據,進行監控報警。

 

 

 

本文部分學習參考了:https://www.cnblogs.com/java-spring/p/8930740.html

至此是關於介紹在Linux系統中阿里Canal中間件的初步搭建和使用,后續會介紹配合消息中間件等方式處理數據同步及其它業務邏輯。

如有疏漏錯誤之處,還請不吝賜教!

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM