Canal——高可用架構設計與應用


 

前言

本篇只介紹跟 高可用 相關的配置。

Canal server 和 client 端的高可用方案依賴 zookeeper, 啟動 canal server 和 client 的時候都會 zookeeper 讀取信息. Canal 在 zookeeper 存儲的數據結構如下:

/otter
└── canal
    └── destinations
        └── flight_segment  # canal 實例名稱 
            ├── 1001 # canal client 信息
            │   ├── cursor # 當前消費的 mysql binlog 位點
            │   ├── filter # binlog 過濾條件
            │   └── running  # 當前正在運行的 canal client 服務器
            ├── cluster # canal server 列表
            │   └── 10.93.61.86:11111 
            └── running # 當前正在運行的 canal server 服務器

Canal server 和 client 啟動的時候都會去搶占 zk 對應的 running 節點, 保證只有一個 server 和 client 在運行, 而 server 和 client 的高可用切換也是基於監聽 running 節點進行的.

 

一、架構

配置說明:

  • zookeeper x 3 + canal x 2 + mysql x 2

組件說明:

  • linux內核版本(CentOS Linux 7):(命令:uname -a)

    Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux

  • mysql版本:(SQL命令:select version(); 或 status)

    Server version: 5.6.43-log MySQL Community Server (GPL)

  • canal版本:canal-1.1.4
  • zookeeper版本:zookeeper-3.4.14
  • JDK版本: 1.8
 

二、搭建zookeeper集群    

搭建zookeeper集群地址為 192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181,具體搭建流程,可查看文章: 【Zookeepr3.4.5集群搭建】

三、搭建canal server集群

前提: mysql已打開binlog功能,且配置binlog模式為row。具體配置,可查看文章:【增量同步MySQL數據到ElasticSearch】

1. 下載最新canal安裝包

下載地址: https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2.上傳並解壓

進入192.168.175.20服務器,使用rz命令上傳,使用如下命令進行解壓至/usr/local/hadoop/app/canal:

tar xzvf canal.deployer-1.1.4.tar.gz -C canal

3. 修改配置instance.properties

新解壓的文件夾/usr/local/hadoop/app/canal/conf/有一個example文件夾,一個example就代表一個instance實例.而一個instance實例就是一個消息隊列,

所以這里可以將文件名改為example1,同時再復制出來一個叫example2.(命名可以使用監聽的數據庫名)。

修改/usr/local/hadoop/app/canal/conf/example1/instance.properties配置文件:

canal.instance.master.address=192.168.175.21:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.mq.topic=example1

修改/usr/local/hadoop/app/canal/conf/example2/instance.properties配置文件:

canal.instance.master.address=192.168.175.22:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
canal.mq.topic=example2

配置文件參數說明,可查看:https://github.com/alibaba/canal/wiki/AdminGuide

4. 修改配置canal.properties

配置/usr/local/hadoop/app/canal/conf/canal.properties是一個對應canal server的全局配置(instance.properties是對應canal instance的配置)。

canal.id = 2  #保證每個canal server的id不同
canal.port = 11111
canal.zkServers =192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
#其他配置默認即可.

注意: 兩台機器上的instance目錄的名字需要保證完全一致,HA模式是依賴於instance name進行管理,同時必須都選擇default-instance.xml配置。

配置完成,將文件從192.168.175.20遠程復制一份到192.168.175.22上:

#需要確保已開通免密
scp -rp /usr/local/hadoop/app/canal slave2:/usr/local/hadoop/app/

5. 啟動canal server

分別進入2台服務器的文件夾/usr/local/hadoop/app/canal/bin執行如下啟動命令:

./startup.sh
  • 查看 server 日志:/usr/local/hadoop/app/canal/logs/canal/canal.log,出現如下內容,即表示啟動成功:
2019-06-07 21:15:03.372 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-06-07 21:15:03.427 [main] INFO  c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations
2019-06-07 21:15:03.529 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server.
2019-06-07 21:15:06.251 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.22:11111]
2019-06-07 21:15:22.245 [main] INFO  com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......
  • 查看 instance 的日志:/usr/local/hadoop/app/canal/logs/example1/example1.log  /usr/local/hadoop/app/canal/logs/example2/example2.log,出現如下內容,即表示啟動成功:

 

注:只會看到一台機器上出現了以上instance啟動成功的日志,即 192.168.175.20 和 192.168.175.22 只會有1台有以上日志輸出。

6. 驗證canal server

  • 在zk中查看canal server節點注冊情況:
[zk: localhost:2181(CONNECTED) 27] ls2 /otter/canal/destinations
[example2, example1]
[zk: localhost:2181(CONNECTED) 26] ls2 /otter/canal/cluster
[192.168.175.22:11111, 192.168.175.20:11111]

可以看到canal server節點已經在zk集群上注冊成功。當停掉一個canal server時,可以看到zk上對應的臨時節點也會刪除.

  • zk中查看canal server當前正在工作的節點:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example1/running
{"active":true,"address":"192.168.175.20:11111"}
  • canal server 自動平滑切換:

先停止正在工作的 192.168.175.20 的 canal server:

bin/stop.sh

這時 192.168.175.22 會立馬啟動example instance,提供新的數據服務:

[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example1/running
{"active":true,"address":"192.168.175.22:11111"}

與此同時,客戶端也會隨着canal server的切換,通過獲取zookeeper中的最新地址,與新的canal server建立鏈接,繼續消費數據,整個過程自動完成。

 

四、搭建canal client

使用canal client通過zookeeper連接canal server集群。注意運行canal客戶端代碼時,一定要先啟動canal server!!!

1. 代碼實現

  • 添加pom依賴:
    <dependency>
      <groupId>com.alibaba.otter</groupId>
      <artifactId>canal.client</artifactId>
      <version>1.1.3</version>
    </dependency>
  • canal client代碼:
package com.xgh.canal;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.*;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;

public class TestCanalByZk {

    public static void main(String args[]) {
        String zkHost="192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181";
        // 創建鏈接
        CanalConnector connector = CanalConnectors.newClusterConnector(zkHost,"example1","","");
        /*CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.22", 11111),
                "example", "", "");*/
        int batchSize = 1000;
        int emptyCount = 0;
        long batchId = 0;
        //外層死循環:在canal節點宕機后,拋出異常,等待zk對canal處理切換,切換完后,繼續創建連接處理數據
        while(true) {
            try {
                connector.connect();
                connector.subscribe(".*\\..*");//訂閱所有庫下面的所有表
                //connector.subscribe("canal.t_canal");//訂閱庫canal庫下的表t_canal
                connector.rollback();
                //內層死循環:按頻率實時監聽數據變化,一旦收到變化數據,立即做消費處理,並ack,考慮消費速度,可以做異步處理並ack.
                while (true) {
                    Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據
                    batchId = message.getId();
                    int size = message.getEntries().size();
                    //// 偏移量不等於-1 或者 獲取的數據條數不為0 時,認為拿到消息,並處理
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);//此時代表當前數據庫無遍更數據
                        Thread.sleep(200); //200ms拉一次變動數據
                    } else {
                        emptyCount = 0;
                        System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                        printEntry(message.getEntries());
                    }

                    connector.ack(batchId); // 提交確認
                }

            }catch(Exception e){
                e.printStackTrace();
                connector.rollback(batchId); // 處理失敗, 回滾數據
            } finally {
                connector.disconnect();
            }
        }
    }

    private static void printEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN
                    || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }

            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                        e);
            }
            System.out.println("rowChare ======>"+rowChage.toString());

            EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete
            System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                    entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱
                    entry.getHeader().getLogfileOffset(), //偏移量
                    entry.getHeader().getSchemaName(),//庫名
                    entry.getHeader().getTableName(), //表名
                    eventType));//事件名

            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    printColumn(rowData.getBeforeColumnsList());
                } else if (eventType == EventType.INSERT) {
                    printColumn(rowData.getAfterColumnsList());
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    printColumn(rowData.getAfterColumnsList());
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
        }
    }
}

canal功能含義:

- 連接,connector.connect() 
- 訂閱,connector.subscribe 
- 獲取數據,connector.getWithoutAck() 
- 業務處理 
- 提交確認,connector.ack() 
- 回滾,connector.rollback() 
- 斷開連接,connector.disconnect()

canal client運行實例:

empty count : 1
empty count : 2
empty count : 3
empty count : 4

2. 驗證canal client

  • 觸發數據庫變更:

創建庫:create database canal;
創建表:create table t_canal (id int,name varchar(20),status int);
插入數據:insert into t_canal values(11,'xxiao',1);

canal client 輸出日志:

================> binlog[mysql-bin.000001:6973] , name[canal,t_canal] , eventType : INSERT
id : 11        update=true
name : xxiao     update=true
status : 1      update=true
  • zk中查看正在連接的 canal client 節點:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example1/1001/running
{"active":true,"address":"192.168.175.18:11111","clientId":1001}
  • zk中查看最后一次消費成功的binlog位點:

數據消費成功后,canal server會在zookeeper中記錄下當前最后一次消費成功的binlog位點.  (下次你重啟client時,會從這最后一個位點繼續進行消費)。

[zk: localhost:2181(CONNECTED) 16] get /otter/canal/destinations/example1/1001/cursor
{"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"10.20.144.15","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.002253","position":2574756,"timestamp":1363688722000}}

 

五、其它

1. canal數據結構

canal的數據傳輸有兩塊,一塊是進行binlog訂閱時,binlog轉換為我們所定義的Message,第二塊是client與server進行TCP交互時,傳輸的TCP協議。
Entry數據結構:

Entry
    Header
        version         [協議的版本號,default = 1]
        logfileName     [binlog文件名]
        logfileOffset   [binlog position]
        serverId        [服務端serverId]
        serverenCode    [變更數據的編碼]
        executeTime     [變更數據的執行時間]
        sourceType      [變更數據的來源,default = MYSQL]
        schemaName      [變更數據的schemaname]
        tableName       [變更數據的tablename]
        eventLength     [每個event的長度]
        eventType       [insert/update/delete類型,default = UPDATE]
        props           [預留擴展]
        gtid            [當前事務的gitd]
    entryType           [事務頭BEGIN/事務尾END/數據ROWDATA/HEARTBEAT/GTIDLOG]
    storeValue          [byte數據,可展開,對應的類型為RowChange]    
RowChange
    tableId             [tableId,由數據庫產生]
    eventType           [數據變更類型,default = UPDATE]
    isDdl               [標識是否是ddl語句,比如create table/drop table]
    sql                 [ddl/query的sql語句]
    rowDatas            [具體insert/update/delete的變更數據,可為多條,1個binlog event事件可對應多條變更,比如批處理]
        beforeColumns   [字段信息,增量數據(修改前,刪除前),Column類型的數組]
        afterColumns    [字段信息,增量數據(修改后,新增后),Column類型的數組] 
        props           [預留擴展]
    props               [預留擴展]
    ddlSchemaName       [ddl/query的schemaName,會存在跨庫ddl,需要保留執行ddl的當前schemaName]
Column 
    index               [字段下標]      
    sqlType             [jdbc type]
    name                [字段名稱(忽略大小寫),在mysql中是沒有的]
    isKey               [是否為主鍵]
    updated             [是否發生過變更]
    isNull              [值是否為null]
    props               [預留擴展]
    value               [字段值,timestamp,Datetime是一個時間格式的文本]
    length              [對應數據對象原始長度]
    mysqlType           [字段mysql類型]

 

 

六、總結

1. 啟動兩個監聽example1的canal client,啟動兩個監聽example2的canal client:

在example1或example2對應的數據發生變化時,兩個canal client只有一個消費消息。

當兩個監聽同一個隊列的canal client有一個宕掉時,再有數據變化時,剩下的一個canal client就會開始消費數據。

這就驗證了canal client的HA機制:為了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操作,否則客戶端接收無法保證有序.

 

2. 啟動兩個canal server並在zk上注冊

當停掉其中一個canal server時,當產生數據變化時,整個canal server集群仍可以正常對外提供服務。

這就驗證了canal server的HA機制:為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處於running,其他的處於standby狀態.

 

3. 在canal server切換過程中,canal client存在重復消費數據的問題

這點需要在消費端自行進行處理。





參考:
https://www.jianshu.com/p/31121d146d69


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM