使用CANAL同步數據


1.概要

canal 是阿里發布的一個mysql 同步工具,它是模擬 mysql slave 的方式讀取binlog,並可以將數據寫入到隊列中。

如下圖:是官方提供的架構圖。

 

2.下載CANAL

下載版本為1.1.5

其中

canal.deployer 是canal服務器

canal.admin 是CANAL可視化管理界面

 

3.配置canal

3.1 配置mysql

創建用戶並授權

 create user 'canal'@'%' identified by 'canal';
grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%';

配置mysql

# 打開binlog
log-bin=mysql-bin
# 選擇ROW(行)模式
binlog-format=ROW
# 配置MySQL replaction需要定義,不要和canal的slaveId重復
server_id=1

配置后重啟mysql

 

 查看binlog文件列表

 

查看當前寫入的log文件

 

 3.2 配置canal

 

編輯文件 conf/example/instance.properties

canal.instance.gtidon=false

# mysql地址
canal.instance.master.address=localhost:3306
# mysql 日志文件
canal.instance.master.journal.name=mysql-bin.000001
# 配置日志起始位置,配置為上圖的 position。
canal.instance.master.position=3970
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true


# 用戶名密碼
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\.slave_.*


# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0

配置好后

 

正常的啟動如下

 

是否啟動成功,我們可以查看日志數據。

 

如果是mysql8 可能會報如下錯誤

Canal 1.1.5 啟動報錯:caching_sha2_password Auth failed

這個是mysql 的密碼驗證失敗。

ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'canal';

執行這個后,在啟動canal。

 

3.3 開發java程序讀取同步數據

開發一個springboot程序。

引入jar包。

<dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.5</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.5</version>
        </dependency>

開發編輯代碼如下:

package com.example.canaldemo;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.stereotype.Component;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.exception.CanalClientException;

import java.net.InetSocketAddress;
import java.util.List;

@Component
public class CannalClient implements InitializingBean {

    private final static int BATCH_SIZE = 1000;

    @Override
    public void afterPropertiesSet() throws Exception {
        // 創建鏈接
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111),
                "example", "", "");
        try {
            //打開連接
            connector.connect();
            //訂閱數據庫表,全部表
            connector.subscribe(".*\\..*");
            //回滾到未進行ack的地方,下次fetch的時候,可以從最后一個沒有ack的地方開始拿
            connector.rollback();
            while (true) {
                // 獲取指定數量的數據
                Message message = connector.getWithoutAck(BATCH_SIZE);
                //獲取批量ID
                long batchId = message.getId();
                //獲取批量的數量
                int size = message.getEntries().size();
                //如果沒有數據
                if (batchId == -1 || size == 0) {
                    try {
                        //線程休眠2秒
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                } else {
                    //如果有數據,處理數據
                    printEntry(message.getEntries());
                }
                //進行 batch id 的確認。確認之后,小於等於此 batchId 的 Message 都會被確認。
                connector.ack(batchId);
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            connector.disconnect();
        }
    }

    /**
     * 打印canal server解析binlog獲得的實體類信息
     */
    private static void printEntry(List<CanalEntry.Entry> entrys) {
        for (CanalEntry.Entry entry : entrys) {
            if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                //開啟/關閉事務的實體類型,跳過
                continue;
            }
            //RowChange對象,包含了一行數據變化的所有特征
            //比如isDdl 是否是ddl變更操作 sql 具體的ddl sql beforeColumns afterColumns 變更前后的數據字段等等
            CanalEntry.RowChange rowChage;
            try {
                rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            //獲取操作類型:insert/update/delete類型
            CanalEntry.EventType eventType = rowChage.getEventType();
            //打印Header信息
            System.out.println(String.format("================》; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType));
            //判斷是否是DDL語句
            if (rowChage.getIsDdl()) {
                System.out.println("================》;isDdl: true,sql:" + rowChage.getSql());
            }
            //獲取RowChange對象里的每一行數據,打印出來
            for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                //如果是刪除語句
                if (eventType == CanalEntry.EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                    //如果是新增語句
                } else if (eventType == CanalEntry.EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                    //如果是更新的語句
                } else {
                    //變更前的數據
                    System.out.println("------->; before");
                    printColumn(rowData.getBeforeColumnsList());
                    //變更后的數據
                    System.out.println("------->; after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<CanalEntry.Column> columns) {
        for (CanalEntry.Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

執行后,我們操作數據庫表,比如刪除數據。

 

這樣我們就可以通過java程序讀取canal讀取的數據。當然我們可以通過代碼將數據插入到其他的數據庫中。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM