SpringBoot 整合cana 實現數據同步


微服務多數據庫情況下可以使用canal替代觸發器,canal是應阿里巴巴跨機房同步的業務需求而提出的,canal基於數據庫的日志解析,獲取變更進行增量訂閱&消費的業務。無論是canal實驗需要還是為了增量備份、主從復制和恢復,都是需要開啟mysql-binlog日志,數據目錄設置到不同的磁盤分區可以降低io等待。

官網:https://github.com/alibaba/canal

canal 工作原理
canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議
MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
canal 解析 binary log 對象(原始為 byte 流)

 

canal 搭建

搭建mysql環境

1,修改配置文件

[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復;

  重啟MySQL服務后,確認是否開啟了binlog(注意一點是MySQL8.x默認開啟binlog)SHOW VARIABLES LIKE '%bin%';:

 
          
         

 

 

2,授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant(省略第三步)

CREATE USER canal IDENTIFIED BY 'root'; 
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'root'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' ;
FLUSH PRIVILEGES;

3,新建一個用戶名canal密碼為QWqw12!@的新用戶,賦予REPLICATION SLAVE和 REPLICATION CLIENT權限:

CREATE USER canal IDENTIFIED BY '123456!@';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY '123456!@';

 

搭建canal環境

下載Linux最新穩定版(canal.deployer-1.1.4.tar.gz):https://github.com/alibaba/canal/releases

解壓后修改/canal/conf/example下的instance.properties配置文件:

canal.instance.master.address,數據庫地址,這里指定為127.0.0.1:3306。
canal.instance.dbUsername,監聽的數據庫用戶名。
canal.instance.dbPassword,監聽的數據庫密碼。
新增canal.instance.defaultDatabaseName,默認那個庫,這里指定為test(需要在MySQL中建立一個test庫)

啟動

sh /canal/bin/startup.sh
# 查看服務日志
tail -100f /canal/logs/canal/canal
# 查看實例日志  -- 一般情況下,關注實例日志即可
tail -100f /canal/logs/example/example.log

到目前為止 canal的服務端我們已經搭建好了 但是到目前 我們只是把數據庫的binlog 拉到canal中,我們還得編寫客戶端消費數據

 

properties配置文件

properties配置分為兩部分:

  • canal.properties (系統根配置文件)
  • instance.properties (instance級別的配置文件,每個instance一份)
  1. instance列表定義 (列出當前server上有多少個instance,每個instance的加載方式是spring/manager等)
  2. common參數定義,比如可以將instance.properties的公用參數,抽取放置到這里,這樣每個instance啟動的時候就可以共享. 【instance.properties配置定義優先級高於canal.properties】

instance.properties介紹:
a. 在canal.properties定義了canal.destinations后,需要在canal.conf.dir對應的目錄下建立同名的文件

比如:

canal.destinations = example1,example2  #spring客戶端注意指定的不同名字

這時需要創建example1和example2兩個目錄,每個目錄里各自有一份instance.properties.

 


兩種方式,官方提供的demo和springboot  starter

1,官方提供的

 <dependency>
       <groupId>com.alibaba.otter</groupId>
       <artifactId>canal.client</artifactId>
       <version>1.1.4</version>
 </dependency>
package com.example.demo.test;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;


public class CanalTest {

public static void main(String[] args) throws Exception {
//canal.ip = 192.168.56.104
//canal.port = 11111
//canal.destinations = example
//canal.user =
//canal.passwd =
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.56.104", 11111), "example", "", "");
try {
connector.connect();
//監聽的表, 格式為數據庫.表名,數據庫.表名
connector.subscribe(".*\\..*");
connector.rollback();

while (true) {
Message message = connector.getWithoutAck(100); // 獲取指定數量的數據
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
Thread.sleep(1000);
continue;
}
// System.out.println(message.getEntries());
printEntries(message.getEntries());
connector.ack(batchId);// 提交確認,消費成功,通知server刪除數據
// connector.rollback(batchId);// 處理失敗, 回滾數據,后續重新獲取數據
}
}catch (Exception e){

}finally {
connector.disconnect();
}
    }

private static void printEntries(List<Entry> entries) throws Exception {
for (Entry entry : entries) {
if (entry.getEntryType() != EntryType.ROWDATA) {
continue;
}

RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());

EventType eventType = rowChange.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));

for (RowData rowData : rowChange.getRowDatasList()) {
switch (rowChange.getEventType()) {
case INSERT:
System.out.println("INSERT ");
printColumns(rowData.getAfterColumnsList());
break;
case UPDATE:
System.out.println("UPDATE ");
printColumns(rowData.getAfterColumnsList());
break;
case DELETE:
System.out.println("DELETE ");
printColumns(rowData.getBeforeColumnsList());
break;

default:
break;
}
}
}
}

private static void printColumns(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
}

操作數據庫增刪改,控制台則會打印

 參考:https://github.com/gxmanito/canal-client

https://gitee.com/zhiqishao/canal-client/tree/master

2,springboot starter

https://github.com/NormanGyllenhaal/canal-client

<dependency>
    <groupId>top.javatool</groupId>
    <artifactId>canal-spring-boot-starter</artifactId>
    <version>1.2.1-RELEASE</version>
</dependency>
package com.example.demo.test;

import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import top.javatool.canal.client.annotation.CanalTable;
import top.javatool.canal.client.handler.EntryHandler;

@Slf4j
@Component
@CanalTable(value = "test")
public class UserHandler implements EntryHandler<Test> {

    @Override
    public void insert(Test user) {
        log.info("insert message  {}", user);
    }

    @Override
    public void update(Test before, Test after) {
        log.info("update before {} ", before);
        log.info("update after {}", after);
    }
    @Override
    public void delete(Test user) {
        log.info("delete  {}", user);
    }
}
package com.example.demo.test;

import lombok.Data;

import java.io.Serializable;

/**
 * @Description //TODO
 * @Author GaoX
 * @Date 2020/6/28 14:44
 */
@Data
//@Table(name = "test")
public class Test implements Serializable {

    private Integer id;
    private String name;

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM