canal 基於Mysql數據庫增量日志解析


canal 基於Mysql數據庫增量日志解析

 1.前言

 最近太多事情 工作的事情,以及終身大事等等 耽誤更新,由於最近做項目需要同步監聽 未來電視 mysql的變更了解到公司會用canal做增量監聽,就嘗試使用了一下 這里做個demo 簡單的記錄一下。

 2.canal簡介

 canal:主要用途是基於 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費的中間件
 當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x

Xnip20200117_145806.png

 3.MySQL 注備復制原理

Xnip20200117_150056.png

  3.1 mysql主備復制工作原理

  1.MySQL master 將數據變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看)
  2.MySQL slave 將 master 的 binary log events 拷貝到它的中繼日志(relay log)
  3.MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據

  3.2 canal 工作原理

  1.canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議
  2.MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal )
  3.canal 解析 binary log 對象(原始為 byte 流)

 4.准備

 對於自建MySQL ,需要先開啟 Binlog寫入功能,並且配置binlog-format 為Row模式 在my.cnf中配置

Xnip20200117_151803.png

 授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

 5.canal 下載安裝配置

  5.1 canal下載

  canal 下載地址 (下載速度可能很慢)

  下載 canal.deployer-xxx.tar.gz 如 canal.deployer-1.1.4.tar.gz

  解壓后 可以看到如下結構
Xnip20200117_150905.png

  5.2 canal 初始配置

  配置修改:

vim conf/example/instance.properties

  如下:

#################################################
## mysql serverId
canal.instance.mysql.slaveId = 2020

# position info 修改自己的數據庫(canal要監聽的數據庫 地址 )
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name = 
canal.instance.master.position = 
canal.instance.master.timestamp = 

#canal.instance.standby.address = 
#canal.instance.standby.journal.name =
#canal.instance.standby.position = 
#canal.instance.standby.timestamp = 

# username/password 修改成自己 數據庫信息的賬號 (單獨開一個 准備階段創建的賬號)
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8

# table regex  表的監聽規則 
# canal.instance.filter.regex = blogs\.blog_info  
canal.instance.filter.regex = .\*\\\\..\*
# table black regex
canal.instance.filter.black.regex = 

  啟動canal

sh bin/startup.sh

  查看server日志
  看到 the canal server is running now 表示啟動成功

vi logs/canal/canal.log


2020-01-08 15:25:33.361 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## 	start the canal server.
2020-01-08 15:25:33.468 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.12.245:11111]
2020-01-08 15:25:34.061 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......

  查看instance的日志

vi logs/example/example.log

2020-01-08 15:25:33.864 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-blogs 
2020-01-08 15:25:33.998 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start successful....
2020-01-08 15:25:33.999 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.otter.canal.parse.inbound.mysql.MysqlEventParser - prepare to find start position just show master status

  5.3 擴展 destination 配置

vi conf/canal.properties

  在canal.destinations 處可以配置當前server上部署的instance 列表 默認為 example ,我這里改成了 blogs最好對應數據庫名稱。一個instance 對應一個 數據庫

Xnip20200117_153243.png

Xnip20200117_153644.png

 6.創建Java 客戶端 監聽canal 消費數據

  6.1 創建maven項目

  6.2 添加canal client POM 依賴

<dependency>
  <groupId>com.alibaba.otter</groupId>
  <artifactId>canal.client</artifactId>
  <version>1.1.0</version>
</dependency>

  6.3 創建 canal 的客戶端監聽

  CanalMessageListener.java

  該類實現InitializingBean 主要是在初始化的時候 執行 init 方法,在init()方法中 創建 CanalConnector對象,連接需要監聽的canal,主要提供 canal的 host ,port ,destination ,以及username 和 password

  parse 方法 主要用於將監聽的對象 通過反射等轉換成對應的實體類

/**
* @author johnny
**/
@Component
@Slf4j
@ConditionalOnProperty(name = "application.canal.accessor", havingValue = "canal")
public class CanalMessageListener implements InitializingBean, ParseCanal {


private CanalConnector connector;

@Autowired
private CanalConfig canalConfig;

@Autowired
private IParseDispatcher configParseDispatcher;

private void init() {
    //創建canal 監聽 傳入host port destination等參數
    connector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalConfig.getHost(), canalConfig.getPort()),
            canalConfig.getDestination(), canalConfig.getUsername(), canalConfig.getPassword());
    connector.connect();
    //  .*\..*
    connector.subscribe(".*\\..*");
    connector.rollback();

    new Thread(() -> {
	
        while (true) {
            Message message = connector.getWithoutAck(canalConfig.getBatchSize());
            long batchId = message.getId();
            long size = message.getEntries().size();
	    //batchId == -1 表示沒有數據變更
            if (batchId == -1 || size == 0) {
                System.out.println("empty data ");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                }
            } else {
	        //解析數據變更
                resoleveEntry(message.getEntries());
            }
        }

    }).start();

}
//解析數據變更
private void resoleveEntry(List<CanalEntry.Entry> entries) {
    CanalEntry.RowChange rowChange = null;
    for (CanalEntry.Entry row : entries) {
     //判斷是否是 事物開始 和 事物結束 
        if (row.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || row.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
            continue;
        }
        try {
            rowChange = CanalEntry.RowChange.parseFrom(row.getStoreValue());
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }

        List<CanalEntry.RowData> rowDataList = rowChange.getRowDatasList();
        String tableName = row.getHeader().getTableName();
        CanalEntry.EventType eventType = row.getHeader().getEventType();

        for (CanalEntry.RowData rowData : rowDataList) {
            if (eventType == CanalEntry.EventType.UPDATE) {
                List<CanalEntry.Column> columns = rowData.getBeforeColumnsList();
                Object object = parse(columns, tableName);
                log.info("收到的 object:{}", JsonUtils.marshalToString(object));
                //根據收到的對象 處理后續業務邏輯
            }
        }

    }
}

@Override
public void afterPropertiesSet() throws Exception {
    init();
}

//解析 List<CanalEntry.Column>對象到對應的 實體類
@Override
public Object parse(List<CanalEntry.Column> canalDatas, String tableName) {
//根據配置好的map 從中根據key 表名 獲取對應的映射后的 實體類class
    String className = configParseDispatcher.dispatch(tableName);
    Object entity = null;
    Class c = null;
    try {
        c = Class.forName(className);
        entity = c.newInstance();
    } catch (ClassNotFoundException e) {
        log.error("【未找到對應 {} 的 實體類 】", className);
    } catch (Exception e) {
    }

    for (CanalEntry.Column canalDataColumn : canalDatas) {
        String columnName = canalDataColumn.getName();
        Field[] fields = c.getDeclaredFields();

        for (Field field : fields) {
            Object fieldValue = null;
            field.setAccessible(true);
            String fiedName = CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, field.getName());
            log.info("【filedName: {}】", fiedName);
            if (fiedName.equals(columnName)) {
                try {
                    if (Long.class.equals(field.getType())) {
                        fieldValue = NumberUtils.toLong(canalDataColumn.getValue());
                    }else if(Integer.class.equals(field.getType())){
                        fieldValue = NumberUtils.toInt(canalDataColumn.getValue());
                    }else if(Double.class.equals(field.getType())){
                        fieldValue = NumberUtils.toDouble(canalDataColumn.getValue());
                    }else if(Date.class.equals(field.getType())){
                        try {
                            fieldValue = DateUtils.parseDate(canalDataColumn.getValue(), new String[]{"yyyy-MM-dd HH:mm:ss"});
                        } catch (ParseException e) {
                            e.printStackTrace();
                        }
                    }else{
                        fieldValue = canalDataColumn.getValue();
                    }
                    field.set(entity, fieldValue);
                    break;
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
        }

    }
    return entity;
}
}

  application.yml
  配置canal 地址,以及表名和實體的映射規則

server:
port: 8881



application:
  canal:
    accessor: canal
    host: 127.0.0.1
    port: 11111
    username:
    password:
    destination: blogs
    batchSize: 30

    parse:   規則,根據表名獲取對應要映射的 實體class
      rule:
        mapping:
          blog_info: com.johnny.canal.canal_test.entity.BlogInfo

  IParseDispatcher.java
  接口:用來根據表名key獲取對應的 要映射的實體,這里寫成接口是因為可以提供多種獲取方式,比如我這里通過yml 配置去獲取

/**
* @author johnny
* @create 2020-01-17 上午11:09
**/
public interface IParseDispatcher {

 String dispatch(String key);

}

  ConfigParseDispatcher.java
  實現上面的接口,提供一種從 application.yml 獲取初始源配置 根據 application.canal.parse.rule進行配置

/**
* @author johnny
* @create 2020-01-17 上午11:07
**/
@Data
@Configuration
@ConfigurationProperties(prefix = "application.canal.parse.rule")
public class ConfigParseDispatcher implements IParseDispatcher {

private Map<String,String> mapping=new HashMap<>();

@Override
public String dispatch(String key) {
    return mapping.get(key);
}

}

  7.演示

  啟動項目 此時控制台打印 empty data ,無數據變更

Xnip20200117_160125.png

  通過執行 在 canal監聽的mysql 上執行 更新語句

update blog_info set blog_title = 'SpringBoot配置相關for canal test '  where id = 40

  debug 程序,當執行上面的update語句后 可以看到立即收到
Xnip20200117_160552.png

  通過parse方法解析為對應的 實體對象,后續做自己的業務邏輯 即可

Xnip20200117_160718.png

 8.總結

本篇主要介紹了canal是什么,如何下載安裝和配置 ,以及提供了自己寫的一個簡單demo 。后續有機會深入了解一下canal的其他功能,比如 如何同步到Kafka/RocketMQ等等。。

個人博客地址: https://www.askajohnny.com 歡迎訪問!
本文由博客一文多發平台 OpenWrite 發布!


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM