canal使用小結


一、基本概念

  mysql本身支持主從備份,原理就是主庫master生成的binlog文件記錄了所有的增刪改操作語句,然后slave向master發送dump協議,master將binlog日志文件推送給從庫slave解析執行,達到數據一致備份的目的。

  canal,基於java開發,偽裝成一個slave,去監聽獲取增量的binlog日志文件,然后解析處理獲得的相關數據(過程中可以加入自由的加入一些額外的功能性代碼需求),利用獲得的數據,可以用其他不同用途,比如同步到es中做搜索相關。

二、canal基本配置使用

  測試環境:windows、mysql 5.7.26、canal 1.1.3、Navicat for MySQL。

  1、mysql安裝和配置

    1.1、下載安裝解壓忽略。進入mysql解壓后目錄,新增data文件夾。

    1.2、新增my.ini文件,添加配置:

[client]
# 設置mysql客戶端連接服務端時默認使用的端口
port=3311
[mysql]
default-character-set=utf8
[mysqld]
character-set-server=utf8
port=3311
# 默認存儲引擎innoDB
default-storage-engine=INNODB
# Server Id.數據庫服務器id,這個id用來在主從服務器中標記唯一mysql服務器
server-id=1
datadir=E:\\soft\\mysql2\\data
bind-address=0.0.0.0
# 開啟binlog日志
log-bin=mysql-bin
binlog_format = ROW

    1.3、cmd進入並目錄,啟動/關閉 mysql:

//啟動
net start mysql
//關閉
net stop mysql

    1.4、連接mysql並設置密碼

    連接:mysql -uroot -p,初始密碼為空,一直按enter即可進入mysql命令行。

    進入后設置密碼:

// 切換庫
use mysql;
// 設置密碼
update user set authentication_string=PASSWORD("123456") where user="root";
// 刷新生效
 flush privileges;

    設置成功后,quit退出重進,輸入密碼123456。

    1.5、新增個canal的訪問賬戶

// 新增用戶
CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
// 授權
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
// 刷新
FLUSH PRIVILEGES;

  2、canal安裝配置

    下載canal包(https://github.com/alibaba/canal/releases),解壓本地目錄。

    2.1、目錄結構

    

      其中cfang是拷貝example,需要多個instance可繼續拷貝,再修改每個instance中的配置文件。

    2.1、配置canal.properties

    

      port可自定義,用於canal對外服務接口。destinations配置instance列表(連接db)。

    2.2、配置instance.properties

    

      其中canal.instance.defaultDatabaseName可不配置,全庫掃描。

    2.3、啟動

      bin目錄,點擊startup.bat,查看/logs/canal/canal.log日志文件,出現以下則為開啟成功:

    2.4、canal數據格式:

Entry
    Header
        logfileName [binlog文件名]
        logfileOffset [binlog position]
        executeTime [發生的變更]
        schemaName
        tableName
        eventType [insert/update/delete類型]
    entryType   [事務頭BEGIN/事務尾END/數據ROWDATA]
    storeValue  [byte數據,可展開,對應的類型為RowChange]   
RowChange
    isDdl       [是否是ddl變更操作,比如create table/drop table]
    sql     [具體的ddl sql]
    rowDatas    [具體insert/update/delete的變更數據,可為多條,1個binlog event事件可對應多條變更,比如批處理]
        beforeColumns [Column類型的數組]
        afterColumns [Column類型的數組]     
Column
    index      
    sqlType     [jdbc type]
    name        [column name]
    isKey       [是否為主鍵]
    updated     [是否發生過變更]
    isNull      [值是否為null]
    value       [具體的內容,注意為文本]

    2.5、java程序測試

      pom導入:

		<dependency>
		    <groupId>com.alibaba.otter</groupId>
		    <artifactId>canal.client</artifactId>
		    <version>1.1.3</version>
		</dependency>

      java測試:

package com.cfang.prebo;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.stream.Collectors;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.protocol.Message;

public class CanalTest {

	public static void main(String[] args) throws Exception {
		
		CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1", 11111), "cfang", "", "");
		connector.connect();
		connector.subscribe(".*\\..*");
		connector.rollback();
		
		while (true) {
            Message message = connector.getWithoutAck(100);  // 獲取指定數量的數據
            long batchId = message.getId();
            if (batchId == -1 || message.getEntries().isEmpty()) {
                Thread.sleep(1000);
                continue;
            }
           // System.out.println(message.getEntries());
            printEntries(message.getEntries());
            connector.ack(batchId);// 提交確認,消費成功,通知server刪除數據
//            connector.rollback(batchId);// 處理失敗, 回滾數據,后續重新獲取數據
        }
	}
	
	private static void printEntries(List<Entry> entries) throws Exception {
        for (Entry entry : entries) {
            if (entry.getEntryType() != EntryType.ROWDATA) {
                continue;
            }

            RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
            
            EventType eventType = rowChange.getEventType();
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType));
            
            for (RowData rowData : rowChange.getRowDatasList()) {
                switch (rowChange.getEventType()) {
	                case INSERT:
	                	 System.out.println("INSERT ");
	                     printColumns(rowData.getAfterColumnsList());
	                     break;
	                case UPDATE:
	                    System.out.println("UPDATE ");
	                    printColumns(rowData.getAfterColumnsList());
	                    break;
	                case DELETE:
	                    System.out.println("DELETE ");
	                    printColumns(rowData.getBeforeColumnsList());
	                    break;
	
	                default:
	                    break;
	             }
            }
        }
    }

	private static void printColumns(List<Column> columns) {
        for(Column column : columns) {
        	System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

  Navicat中進行相關操作的時候,可在控台看到輸出,例如:

  

 

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM