Canal入門


1 Canal環境准備

1.1 docker安裝數據庫

image-20200904234925799

配置文件

[root@other example]# cat /mydata/mysql/master/conf/my.cnf
[client]
default-character-set=utf8

[mysql]
default-character-set=utf8


[mysqld]
init_connect='set collation_connection=utf8_unicode_ci'
init_connect='SET NAMES utf8'
character-set-server=utf8
collation-server=utf8_unicode_ci
skip-character-set-client-handshake
skip-name-resolve

server_id=1
log-bin=mysql-bin
binlog_format=ROW
[root@other example]#

1.2 Mysql配置

canal 的原理是基於 mysql binlog 技術,所以這里一定需要開啟 mysql 的 binlog 寫入功能,建議配置 binlog 模式為 row。

查看方式:

SHOW VARIABLES LIKE 'binlog_format' ;

img

修改以下配置項:

[mysqld]
log-bin=mysql-bin  #添加這一行就 ok
binlog_format=ROW   #選擇 row 模式
server_id=1      #配置mysql replaction需要定義,不能與canal的slaveId重復

知識小貼士 :

  1. Row

日志中會記錄成每一行數據被修改的形式,然后在 slave 端再對相同的數據進行修改。

優點:在 row 模式下,bin-log 中可以不記錄執行的 SQL 語句的上下文相關的信息,僅僅只需要記錄那一條記錄被修改了,修改成什么樣了。所以 row 的日志內容會非常清楚的記錄下每一行數據修改的細節,非常容易理解。而且不會出現某些特定情況下的存儲過程或 function ,以及 trigger 的調用和觸發無法被正確復制的問題。

  1. Statement

每一條會修改數據的 SQL 都會記錄到 master 的 bin-log 中。slave 在復制的時候 SQL 進程會解析成和原來 master 端執行過的相同的 SQL 再次執行。

優點:在 statement 模式下,首先就是解決了 row 模式的缺點,不需要記錄每一行數據的變化,減少了 bin-log 日志量,節省 I/O 以及存儲資源,提高性能。因為他只需要記錄在 master 上所執行的語句的細節,以及執行語句時候的上下文的信息。

缺點:在 statement 模式下,由於他是記錄的執行語句,所以,為了讓這些語句在 slave 端也能正確執行,那么他還必須記錄每條語句在執行的時候的一些相關信息,也就是上下文信息,以保證所有語句在 slave 端杯執行的時候能夠得到和在 master 端執行時候相同的結果。另外就是,由於 MySQL 現在發展比較快,很多的新功能不斷的加入,使 MySQL 的復制遇到了不小的挑戰,自然復制的時候涉及到越復雜的內容,bug 也就越容易出現。在 statement 中,目前已經發現的就有不少情況會造成 MySQL 的復制出現問題,主要是修改數據的時候使用了某些特定的函數或者功能的時候會出現,比如:sleep() 函數在有些版本中就不能被正確復制,在存儲過程中使用了 last_insert_id() 函數,可能會使 slave 和 master 上得到不一致的 id 等等。由於 row 是基於每一行來記錄的變化,所以不會出現類似的問題。

1.3 Mysql創建用戶授權

canal 的原理是模擬自己為 mysql slave ,所以這里一定需要做為 mysql slave 的相關權限。 創建一個主從同步的賬戶,並且賦予權限:

CREATE USER canal@'localhost' IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'canal'@'localhost'; 
FLUSH PRIVILEGES; 

2 Canal部署安裝

2.1 上傳解壓

解壓后的目錄如下:

image-20200904235458150

目錄介紹:

bin : 存儲的是可執行腳本

conf :存放canal的配置文件

lib :存放canal的lib目錄

logs :存放的是日志文件

2.2 配置

編輯canal/conf/example/instance.properties :

[root@other canal]# cat conf/example/instance.properties
#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=127.0.0.1:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################
[root@other canal]#

啟動的話執行startup.sh即可

3 數據拉取測試

拿到官網的測試代碼

<dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.protocol</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>commons-lang</groupId>
            <artifactId>commons-lang</artifactId>
            <version>2.6</version>
        </dependency>

測試類

package com.dalianpai.canal;

import java.io.UnsupportedEncodingException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Pair;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionBegin;
import com.alibaba.otter.canal.protocol.CanalEntry.TransactionEnd;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

/**
 * @author WGR
 * @create 2020/9/4 -- 23:25
 */
public class BaseCanalClientTest {
    protected final static Logger logger             = LoggerFactory.getLogger(AbstractCanalClientTest.class);
    protected static final String             SEP                = SystemUtils.LINE_SEPARATOR;
    protected static final String             DATE_FORMAT        = "yyyy-MM-dd HH:mm:ss";
    protected volatile boolean                running            = false;
    protected Thread.UncaughtExceptionHandler handler            = new Thread.UncaughtExceptionHandler() {

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            logger.error("parse events has an error", e);
        }
    };
    protected Thread                          thread             = null;
    protected CanalConnector connector;
    protected static String                   context_format     = null;
    protected static String                   row_format         = null;
    protected static String                   transaction_format = null;
    protected String                          destination;

    static {
        context_format = SEP + "****************************************************" + SEP;
        context_format += "* Batch Id: [{}] ,count : [{}] , memsize : [{}] , Time : {}" + SEP;
        context_format += "* Start : [{}] " + SEP;
        context_format += "* End : [{}] " + SEP;
        context_format += "****************************************************" + SEP;

        row_format = SEP
                + "----------------> binlog[{}:{}] , name[{},{}] , eventType : {} , executeTime : {}({}) , gtid : ({}) , delay : {} ms"
                + SEP;

        transaction_format = SEP
                + "================> binlog[{}:{}] , executeTime : {}({}) , gtid : ({}) , delay : {}ms"
                + SEP;

    }

    protected void printSummary(Message message, long batchId, int size) {
        long memsize = 0;
        for (Entry entry : message.getEntries()) {
            memsize += entry.getHeader().getEventLength();
        }

        String startPosition = null;
        String endPosition = null;
        if (!CollectionUtils.isEmpty(message.getEntries())) {
            startPosition = buildPositionForDump(message.getEntries().get(0));
            endPosition = buildPositionForDump(message.getEntries().get(message.getEntries().size() - 1));
        }

        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        logger.info(context_format, new Object[] { batchId, size, memsize, format.format(new Date()), startPosition,
                endPosition });
    }

    protected String buildPositionForDump(Entry entry) {
        long time = entry.getHeader().getExecuteTime();
        Date date = new Date(time);
        SimpleDateFormat format = new SimpleDateFormat(DATE_FORMAT);
        String position = entry.getHeader().getLogfileName() + ":" + entry.getHeader().getLogfileOffset() + ":"
                + entry.getHeader().getExecuteTime() + "(" + format.format(date) + ")";
        if (StringUtils.isNotEmpty(entry.getHeader().getGtid())) {
            position += " gtid(" + entry.getHeader().getGtid() + ")";
        }
        return position;
    }

    protected void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            long executeTime = entry.getHeader().getExecuteTime();
            long delayTime = new Date().getTime() - executeTime;
            Date date = new Date(entry.getHeader().getExecuteTime());
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) {
                    TransactionBegin begin = null;
                    try {
                        begin = TransactionBegin.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事務頭信息,執行的線程id,事務耗時
                    logger.info(transaction_format,
                            new Object[] { entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader().getLogfileOffset()),
                                    String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(), String.valueOf(delayTime) });
                    logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId());
                    printXAInfo(begin.getPropsList());
                } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    TransactionEnd end = null;
                    try {
                        end = TransactionEnd.parseFrom(entry.getStoreValue());
                    } catch (InvalidProtocolBufferException e) {
                        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                    }
                    // 打印事務提交信息,事務id
                    logger.info("----------------\n");
                    logger.info(" END ----> transaction id: {}", end.getTransactionId());
                    printXAInfo(end.getPropsList());
                    logger.info(transaction_format,
                            new Object[] { entry.getHeader().getLogfileName(),
                                    String.valueOf(entry.getHeader().getLogfileOffset()),
                                    String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                    entry.getHeader().getGtid(), String.valueOf(delayTime) });
                }

                continue;
            }

            if (entry.getEntryType() == EntryType.ROWDATA) {
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
                }

                EventType eventType = rowChage.getEventType();

                logger.info(row_format,
                        new Object[] { entry.getHeader().getLogfileName(),
                                String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
                                entry.getHeader().getTableName(), eventType,
                                String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date),
                                entry.getHeader().getGtid(), String.valueOf(delayTime) });

                if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
                    logger.info(" sql ----> " + rowChage.getSql() + SEP);
                    continue;
                }

                printXAInfo(rowChage.getPropsList());
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        printColumn(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        printColumn(rowData.getAfterColumnsList());
                    } else {
                        printColumn(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    }

    protected void printColumn(List<Column> columns) {
        for (Column column : columns) {
            StringBuilder builder = new StringBuilder();
            try {
                if (StringUtils.containsIgnoreCase(column.getMysqlType(), "BLOB")
                        || StringUtils.containsIgnoreCase(column.getMysqlType(), "BINARY")) {
                    // get value bytes
                    builder.append(column.getName() + " : "
                            + new String(column.getValue().getBytes("ISO-8859-1"), "UTF-8"));
                } else {
                    builder.append(column.getName() + " : " + column.getValue());
                }
            } catch (UnsupportedEncodingException e) {
            }
            builder.append("    type=" + column.getMysqlType());
            if (column.getUpdated()) {
                builder.append("    update=" + column.getUpdated());
            }
            builder.append(SEP);
            logger.info(builder.toString());
        }
    }

    protected void printXAInfo(List<Pair> pairs) {
        if (pairs == null) {
            return;
        }

        String xaType = null;
        String xaXid = null;
        for (Pair pair : pairs) {
            String key = pair.getKey();
            if (StringUtils.endsWithIgnoreCase(key, "XA_TYPE")) {
                xaType = pair.getValue();
            } else if (StringUtils.endsWithIgnoreCase(key, "XA_XID")) {
                xaXid = pair.getValue();
            }
        }

        if (xaType != null && xaXid != null) {
            logger.info(" ------> " + xaType + " " + xaXid);
        }
    }

    public void setConnector(CanalConnector connector) {
        this.connector = connector;
    }

    /**
     * 獲取當前Entry的 GTID信息示例
     *
     * @param header
     * @return
     */
    public static String getCurrentGtid(CanalEntry.Header header) {
        List<CanalEntry.Pair> props = header.getPropsList();
        if (props != null && props.size() > 0) {
            for (CanalEntry.Pair pair : props) {
                if ("curtGtid".equals(pair.getKey())) {
                    return pair.getValue();
                }
            }
        }
        return "";
    }

    /**
     * 獲取當前Entry的 GTID Sequence No信息示例
     *
     * @param header
     * @return
     */
    public static String getCurrentGtidSn(CanalEntry.Header header) {
        List<CanalEntry.Pair> props = header.getPropsList();
        if (props != null && props.size() > 0) {
            for (CanalEntry.Pair pair : props) {
                if ("curtGtidSn".equals(pair.getKey())) {
                    return pair.getValue();
                }
            }
        }
        return "";
    }

    /**
     * 獲取當前Entry的 GTID Last Committed信息示例
     *
     * @param header
     * @return
     */
    public static String getCurrentGtidLct(CanalEntry.Header header) {
        List<CanalEntry.Pair> props = header.getPropsList();
        if (props != null && props.size() > 0) {
            for (CanalEntry.Pair pair : props) {
                if ("curtGtidLct".equals(pair.getKey())) {
                    return pair.getValue();
                }
            }
        }
        return "";
    }
}

package com.dalianpai.canal;

import org.slf4j.MDC;
import org.springframework.util.Assert;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.protocol.Message;

/**
 * @author WGR
 * @create 2020/9/4 -- 23:26
 */
public class AbstractCanalClientTest extends BaseCanalClientTest {
    public AbstractCanalClientTest(String destination){
        this(destination, null);
    }

    public AbstractCanalClientTest(String destination, CanalConnector connector){
        this.destination = destination;
        this.connector = connector;
    }

    protected void start() {
        Assert.notNull(connector, "connector is null");
        thread = new Thread(new Runnable() {

            @Override
            public void run() {
                process();
            }
        });

        thread.setUncaughtExceptionHandler(handler);
        running = true;
        thread.start();
    }

    protected void stop() {
        if (!running) {
            return;
        }
        running = false;
        if (thread != null) {
            try {
                thread.join();
            } catch (InterruptedException e) {
                // ignore
            }
        }

        MDC.remove("destination");
    }

    protected void process() {
        int batchSize = 5 * 1024;
        while (running) {
            try {
                MDC.put("destination", destination);
                connector.connect();
                connector.subscribe();
                while (running) {
                    Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        // try {
                        // Thread.sleep(1000);
                        // } catch (InterruptedException e) {
                        // }
                    } else {
                        printSummary(message, batchId, size);
                        printEntry(message.getEntries());
                    }

                    if (batchId != -1) {
                        connector.ack(batchId); // 提交確認
                    }
                }
            } catch (Throwable e) {
                logger.error("process error!", e);
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e1) {
                    // ignore
                }

                connector.rollback(); // 處理失敗, 回滾數據
            } finally {
                connector.disconnect();
                MDC.remove("destination");
            }
        }
    }
}

package com.dalianpai.canal;

import java.net.InetSocketAddress;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;

/**
 * @author WGR
 * @create 2020/9/4 -- 23:23
 */
public class  SimpleCanalClientTest extends AbstractCanalClientTest{

    public SimpleCanalClientTest(String destination){
        super(destination);
    }

    public static void main(String args[]) {
        // 根據ip,直接創建鏈接,無HA的功能
        String destination = "example";
        String ip = AddressUtils.getHostIp();
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.1.121", 11111),
                destination,
                "canal",
                "canal");

        final SimpleCanalClientTest clientTest = new SimpleCanalClientTest(destination);
        clientTest.setConnector(connector);
        clientTest.start();
        Runtime.getRuntime().addShutdownHook(new Thread() {

            @Override
            public void run() {
                try {
                    logger.info("## stop the canal client");
                    clientTest.stop();
                } catch (Throwable e) {
                    logger.warn("##something goes wrong when stopping canal:", e);
                } finally {
                    logger.info("## canal client is down.");
                }
            }

        });
    }

}

3.1 數據變更測試

3.1.1 創建表

創建tb_book表:

CREATE TABLE tb_book (
 id INT(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
 name VARCHAR(100) NOT NULL COMMENT '書名',
 author VARCHAR(100) DEFAULT NULL COMMENT '作者',
 publishtime DATETIME DEFAULT NULL COMMENT '發行日期',
 price DOUBLE(10,2) DEFAULT NULL COMMENT '價格',
 publishgroup VARCHAR(100) DEFAULT NULL COMMENT '發版社',
 PRIMARY KEY (id)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4;

日志:

23:36:43.256 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
----------------> binlog[mysql-bin.000004:405] , name[canal,tb_book] , eventType : CREATE , executeTime : 1599233803000(2020-09-04 23:36:43) , gtid : () , delay : 256 ms

23:36:43.256 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest -  sql ----> CREATE TABLE `tb_book` (
`id` INT(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`name` VARCHAR(100) NOT NULL COMMENT '書名',
`author` VARCHAR(100) DEFAULT NULL COMMENT '作者',
`publishtime` DATETIME DEFAULT NULL COMMENT '發行日期',
`price` DOUBLE(10,2) DEFAULT NULL COMMENT '價格',
`publishgroup` VARCHAR(100) DEFAULT NULL COMMENT '發版社',
PRIMARY KEY (`id`)
) ENGINE=INNODB DEFAULT CHARSET=utf8mb4

23:36:57.670 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
****************************************************
* Batch Id: [4] ,count : [6] , memsize : [452] , Time : 2020-09-04 23:36:57
* Start : [mysql-bin.000004:966:1599233817000(2020-09-04 23:36:57)] 
* End : [mysql-bin.000004:1580:1599233817000(2020-09-04 23:36:57)] 
****************************************************

23:36:57.673 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
================> binlog[mysql-bin.000004:966] , executeTime : 1599233817000(2020-09-04 23:36:57) , gtid : () , delay : 671ms

3.1.2 插入數據

執行SQL :

INSERT INTO tb_book(NAME , author , publishtime , price , publishgroup) VALUES('白帽子講安全協議','吳瀚請',NOW(),99.00,'電子工業出版社');

INSERT INTO tb_book(NAME , author , publishtime , price , publishgroup) VALUES('白帽子講安全協議2','吳瀚請',NOW(),99.00,'電子工業出版社');

Canal數據監測結果 :

23:36:57.673 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest -  BEGIN ----> Thread id: 10
23:36:57.677 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
----------------> binlog[mysql-bin.000004:1111] , name[canal,tb_book] , eventType : INSERT , executeTime : 1599233817000(2020-09-04 23:36:57) , gtid : () , delay : 673 ms

23:36:57.677 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - id : 1    type=INT(11)    update=true

23:36:57.677 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - name : 白帽子講安全協議    type=VARCHAR(100)    update=true

23:36:57.677 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - author : 吳瀚請    type=VARCHAR(100)    update=true

23:36:57.678 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - publishtime : 2020-09-04 15:36:57    type=DATETIME    update=true

23:36:57.678 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - price : 99.0    type=DOUBLE(10,2)    update=true

23:36:57.678 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - publishgroup : 電子工業出版社    type=VARCHAR(100)    update=true

23:36:57.680 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - ----------------

23:36:57.680 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest -  END ----> transaction id: 64
23:36:57.680 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
================> binlog[mysql-bin.000004:1224] , executeTime : 1599233817000(2020-09-04 23:36:57) , gtid : () , delay : 678ms

23:36:57.680 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
================> binlog[mysql-bin.000004:1320] , executeTime : 1599233817000(2020-09-04 23:36:57) , gtid : () , delay : 680ms

23:36:57.683 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest -  BEGIN ----> Thread id: 10
23:36:57.684 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
----------------> binlog[mysql-bin.000004:1465] , name[canal,tb_book] , eventType : INSERT , executeTime : 1599233817000(2020-09-04 23:36:57) , gtid : () , delay : 683 ms

23:36:57.684 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - id : 2    type=INT(11)    update=true

23:36:57.684 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - name : 白帽子講安全協議 2    type=VARCHAR(100)    update=true

23:36:57.684 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - author : 吳瀚請    type=VARCHAR(100)    update=true

23:36:57.684 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - publishtime : 2020-09-04 15:36:57    type=DATETIME    update=true

23:36:57.684 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - price : 99.0    type=DOUBLE(10,2)    update=true

23:36:57.684 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - publishgroup : 電子工業出版社    type=VARCHAR(100)    update=true

23:36:57.684 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - ----------------

23:36:57.684 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest -  END ----> transaction id: 65
23:36:57.684 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
================> binlog[mysql-bin.000004:1580] , executeTime : 1599233817000(2020-09-04 23:36:57) , gtid : () , delay : 684ms

3.1.3 更新數據

執行SQL語句:

UPDATE tb_book SET NAME = '白帽子講安全協議第二版' WHERE id = 2;

Canal數據監測結果:

23:39:30.367 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest -  BEGIN ----> Thread id: 10
23:39:30.367 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
----------------> binlog[mysql-bin.000004:1813] , name[canal,tb_book] , eventType : UPDATE , executeTime : 1599233970000(2020-09-04 23:39:30) , gtid : () , delay : 367 ms

23:39:30.367 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - id : 2    type=INT(11)

23:39:30.367 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - name : 白帽子講安全協議第二版    type=VARCHAR(100)    update=true

23:39:30.367 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - author : 吳瀚請    type=VARCHAR(100)

23:39:30.367 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - publishtime : 2020-09-04 15:36:57    type=DATETIME

23:39:30.367 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - price : 99.0    type=DOUBLE(10,2)

23:39:30.367 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - publishgroup : 電子工業出版社    type=VARCHAR(100)

23:39:30.367 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - ----------------

23:39:30.367 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest -  END ----> transaction id: 72
23:39:30.367 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
================> binlog[mysql-bin.000004:2016] , executeTime : 1599233970000(2020-09-04 23:39:30) , gtid : () , delay : 367ms

23:40:02.977 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 

3.1.4 刪除數據

執行SQL :

DELETE FROM tb_book WHERE id = 1;

Canal數據監測結果:

23:40:02.977 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
================> binlog[mysql-bin.000004:2112] , executeTime : 1599234003000(2020-09-04 23:40:03) , gtid : () , delay : -23ms

23:40:02.977 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest -  BEGIN ----> Thread id: 10
23:40:02.978 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
----------------> binlog[mysql-bin.000004:2249] , name[canal,tb_book] , eventType : DELETE , executeTime : 1599234003000(2020-09-04 23:40:03) , gtid : () , delay : -23 ms

23:40:02.978 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - id : 1    type=INT(11)
23:40:02.978 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - name : 白帽子講安全協議    type=VARCHAR(100)
23:40:02.978 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - author : 吳瀚請    type=VARCHAR(100)
23:40:02.978 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - publishtime : 2020-09-04 15:36:57    type=DATETIME
23:40:02.978 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - price : 99.0    type=DOUBLE(10,2)
23:40:02.978 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - publishgroup : 電子工業出版社    type=VARCHAR(100)
23:40:02.978 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - ----------------
23:40:02.979 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest -  END ----> transaction id: 79
23:40:02.979 [Thread-0] INFO  c.d.canal.AbstractCanalClientTest - 
================> binlog[mysql-bin.000004:2362] , executeTime : 1599234003000(2020-09-04 23:40:03) , gtid : () , delay : -22ms


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM