canal


canal簡介

canal可以用來監控數據庫數據的變化,從而獲得新增數據,或者修改的數據。

canal是應對阿里巴巴存在杭州和美國的雙機房部署,存在跨機房同步的業務需求而提出的。

阿里系公司開始逐步的嘗試基於數據庫的日志解析,獲取增量變更進行同步,由此衍生出了增量訂閱&消費的業務。

原理相對比較簡單:

  1. canal模擬mysql slave的交互協議,偽裝自己為mysql slave,向mysql master發送dump協議
  2. mysql master收到dump請求,開始推送binary log給slave(也就是canal)
  3. canal解析binary log對象(原始為byte流)

環境部署

mysql開啟binlog模式

(1)查看當前mysql是否開啟binlog模式。

SHOW VARIABLES LIKE '%log_bin%'

如果log_bin的值為OFF是未開啟,為ON是已開啟。

(2)修改/etc/my.cnf 需要開啟binlog模式。

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

修改完成之后,重啟mysqld的服務。

(3) 進入mysql

mysql -h localhost -u root -p

(4)創建賬號 用於測試使用

使用root賬號創建用戶並授予權限

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

canal服務端安裝配置

(1)下載地址canal

https://github.com/alibaba/canal/releases/tag/canal-1.0.24

(2)下載之后 上傳到linux系統中,解壓縮到指定的目錄/usr/local/canal

解壓縮之后的目錄結構如下:

(3)修改 exmaple下的實例配置

vi conf/example/instance.properties

修改如圖所示的幾個參數。

(3)指定讀取位置

進入mysql中執行下面語句查看binlog所在位置

mysql> show master status;
顯示如下:
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000001 |      120 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

如果file中binlog文件不為 mysql-bin.000001 可以重置mysql

mysql> reset master;

查看canal配置文件

vim /usr/local/canal/conf/example/meta.dat

找到對應的binlog信息更改一致即可

"journalName":"mysql-bin.000001","position":120,"

注意:如果不一致,可能導致以下錯誤

2019-06-17 19:35:20.918 [New I/O server worker #1-2] ERROR c.a.otter.canal.server.netty.handler.SessionHandler - something goes wrong with channel:[id: 0x7f2e9be3, /192.168.200.56:52225 => /192.168.200.128:11111], exception=java.io.IOException: Connection reset by peer

(4)啟動服務:

[root@localhost canal]# ./bin/startup.sh

(5)查看日志:

cat /usr/local/canal/logs/canal/canal.log

這樣就表示啟動成功了。

數據監控微服務

當用戶執行數據庫的操作的時候,binlog 日志會被canal捕獲到,並解析出數據。我們就可以將解析出來的數據進行相應的邏輯處理。

我們這里使用的一個開源的項目,它實現了springboot與canal的集成。比原生的canal更加優雅。

https://github.com/chenqian56131/spring-boot-starter-canal

使用前需要將starter-canal安裝到本地倉庫。

我們可以參照它提供的canal-test,進行代碼實現。

微服務搭建

(1)創建工程模塊changgou_canal,pom引入依賴

<dependency>
    <groupId>com.xpand</groupId>
    <artifactId>starter-canal</artifactId>
    <version>0.0.1-SNAPSHOT</version>
</dependency>

(2)創建包com.changgou.canal ,包下創建啟動類

@SpringBootApplication
@EnableCanalClient //聲明當前的服務是canal的客戶端
public class CanalApplication {
​
    public static void main(String[] args) {
        SpringApplication.run(CanalApplication.class,args);
    }
}

(3)添加配置文件application.properties

canal.client.instances.example.host=192.168.200.128
canal.client.instances.example.port=11111
canal.client.instances.example.batchSize=1000
spring.rabbitmq.host=192.168.200.128

(4)創建com.changgou.canal.listener包,包下創建類

@CanalEventListener //聲明當前的類是canal的監聽類
public class BusinessListener {
​
    @Autowired
    private RabbitTemplate rabbitTemplate;
​
    /**
     *
     * @param eventType 當前操作數據庫的類型
     * @param rowData 當前操作數據庫的數據
     */
    @ListenPoint(schema = "changgou_business",table = "tb_ad")
    public void adUpdate(CanalEntry.EventType eventType,CanalEntry.RowData rowData){
        System.out.println("廣告表數據發生改變");
        //獲取改變之前的數據
        rowData.getBeforeColumnsList().forEach((c)-> System.out.println("改變前的數據:"+c.getName()+"::"+c.getValue()));
​
        //獲取改變之后的數據
        rowData.getAfterColumnsList().forEach((c)-> System.out.println("改變之后的數據:"+c.getName()+"::"+c.getValue()));
    }
}

測試:啟動數據監控微服務,修改changgou_business的tb_ad表,觀察控制台輸出。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM