canal簡單安裝使用


canal簡介:https://github.com/alibaba/canal

1、數據庫配置

首先使用canal需要修改數據庫配置

[mysqld]
log-bin=mysql-bin # 開啟 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復

創建canal數據庫用戶

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

 

2、安裝canal

下載:https://github.com/alibaba/canal/releases

解壓(修改版本號):tar zxvf canal.deployer-1.1.4.tar.gz -C ./canal

配置開放服務器端口:11110、11111、11112

修改canal配置文件(這里設置了兩個instance,即兩個數據庫):

vi canal/conf/canal.properties
canal.destinations = example1,example2

 配置instance:

cp -R canal/conf/example conf/example1
mv conf/example conf/example2

第一個數據庫配置

vi canal/conf/example1/instance.properties
canal.instance.master.address=32.1.2.140:3306

第二個數據庫配置

vi canal/conf/example2/instance.properties
canal.instance.master.address=32.1.2.140:3307

#如果需要新增一個instance,只需要修改canal.properties文件,並新增一個instance配置即可,無需重啟canal。

運行:

sh canal/bin/startup.sh
# 查看日志
cat canal/logs/canal/canal

 

3、Java使用樣例

引入pom依賴,需要與安裝的canal版本一致

<dependencies>
    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.4</version>
    </dependency>
</dependencies>

示例代碼(異步打印兩個數據庫的修改內容):

package cn.spicybar.dblog;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;

import java.net.InetSocketAddress;
import java.util.List;

public class CanalClient {

    public static void main(String[] args) {
        new Thread(() -> initConnector("example1")).start();
        new Thread(() -> initConnector("example2")).start();
    }

    private static void initConnector(String destination) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("32.1.0.237", 11111), destination, "", "");
        try {
            connector.connect();
            connector.subscribe(".*\\..*");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(1000);
                if (message.getId() != -1 && message.getEntries().size() > 0) {
                    printEntry(message.getEntries());
                }
                connector.ack(message.getId());
            }
        } finally {
            connector.disconnect();
        }
    }

    private static void printEntry(List<Entry> entries) {
        for (Entry entry : entries) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            try {
                RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
                System.out.println(rowChange.getSql());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser error, data:" + entry.toString(), e);
            }
        }
    }

}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM