前言
本篇只介紹跟 高可用 相關的配置。
- TCP模式 請參考文章:【Canal——增量同步MySQL數據到ElasticSearch】
Canal server 和 client 端的高可用方案依賴 zookeeper, 啟動 canal server 和 client 的時候都會 zookeeper 讀取信息. Canal 在 zookeeper 存儲的數據結構如下:
/otter └── canal └── destinations └── flight_segment # canal 實例名稱 ├── 1001 # canal client 信息 │ ├── cursor # 當前消費的 mysql binlog 位點 │ ├── filter # binlog 過濾條件 │ └── running # 當前正在運行的 canal client 服務器 ├── cluster # canal server 列表 │ └── 10.93.61.86:11111 └── running # 當前正在運行的 canal server 服務器
Canal server 和 client 啟動的時候都會去搶占 zk 對應的 running 節點, 保證只有一個 server 和 client 在運行, 而 server 和 client 的高可用切換也是基於監聽 running 節點進行的.
一、架構
配置說明:
- zookeeper x 3 + canal x 2 + mysql x 2
組件說明:
- linux內核版本(CentOS Linux 7):(命令:uname -a)
Linux slave1 3.10.0-693.el7.x86_64 #1 SMP Tue Aug 22 21:09:27 UTC 2017 x86_64 x86_64 x86_64 GNU/Linux
- mysql版本:(SQL命令:select version(); 或 status)
Server version: 5.6.43-log MySQL Community Server (GPL)
- canal版本:canal-1.1.4
- zookeeper版本:zookeeper-3.4.14
- JDK版本: 1.8
二、搭建zookeeper集群
192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181
,具體搭建流程,可查看文章:
【Zookeepr3.4.5集群搭建】。
三、搭建canal server集群
前提: mysql已打開binlog功能,且配置binlog模式為row。具體配置,可查看文章:【增量同步MySQL數據到ElasticSearch】。
1. 下載最新canal安裝包
下載地址: https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
2.上傳並解壓
進入192.168.175.20服務器,使用rz命令上傳,使用如下命令進行解壓至/usr/local/hadoop/app/canal
:
tar xzvf canal.deployer-1.1.4.tar.gz -C canal
3. 修改配置instance.properties
新解壓的文件夾/usr/local/hadoop/app/canal/conf/
有一個example
文件夾,一個example就代表一個instance實例.而一個instance實例就是一個消息隊列,
所以這里可以將文件名改為example1,同時再復制出來一個叫example2.(命名可以使用監聽的數據庫名)。
修改/usr/local/hadoop/app/canal/conf/example1/instance.properties
配置文件:
canal.instance.master.address=192.168.175.21:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.mq.topic=example1
修改/usr/local/hadoop/app/canal/conf/example2/instance.properties
配置文件:
canal.instance.master.address=192.168.175.22:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 canal.mq.topic=example2
配置文件參數說明,可查看:https://github.com/alibaba/canal/wiki/AdminGuide
4. 修改配置canal.properties
配置/usr/local/hadoop/app/canal/conf/canal.properties
是一個對應canal server的全局配置(instance.properties是對應canal instance的配置)。
canal.id = 2 #保證每個canal server的id不同 canal.port = 11111 canal.zkServers =192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181 canal.instance.global.spring.xml = classpath:spring/default-instance.xml #其他配置默認即可.
注意: 兩台機器上的instance目錄的名字需要保證完全一致,HA模式是依賴於instance name進行管理,同時必須都選擇default-instance.xml配置。
配置完成,將文件從192.168.175.20遠程復制一份到192.168.175.22上:
#需要確保已開通免密
scp -rp /usr/local/hadoop/app/canal slave2:/usr/local/hadoop/app/
5. 啟動canal server
分別進入2台服務器的文件夾/usr/local/hadoop/app/canal/bin
執行如下啟動命令:
./startup.sh
- 查看 server 日志:
/usr/local/hadoop/app/canal/logs/canal/canal.log,
出現如下內容,即表示啟動成功:
2019-06-07 21:15:03.372 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations 2019-06-07 21:15:03.427 [main] INFO c.a.o.c.d.monitor.remote.RemoteConfigLoaderFactory - ## load local canal configurations 2019-06-07 21:15:03.529 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## start the canal server. 2019-06-07 21:15:06.251 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.175.22:11111] 2019-06-07 21:15:22.245 [main] INFO com.alibaba.otter.canal.deployer.CanalStater - ## the canal server is running now ......
- 查看 instance 的日志:/usr/local/hadoop/app/canal/logs/example1/example1.log 和 /usr/local/hadoop/app/canal/logs/example2/example2.log,出現如下內容,即表示啟動成功:
注:只會看到一台機器上出現了以上instance啟動成功的日志,即 192.168.175.20 和 192.168.175.22 只會有1台有以上日志輸出。
6. 驗證canal server
- 在zk中查看canal server節點注冊情況:
[zk: localhost:2181(CONNECTED) 27] ls2 /otter/canal/destinations [example2, example1] [zk: localhost:2181(CONNECTED) 26] ls2 /otter/canal/cluster [192.168.175.22:11111, 192.168.175.20:11111]
可以看到canal server節點已經在zk集群上注冊成功。當停掉一個canal server時,可以看到zk上對應的臨時節點也會刪除.
- 在zk中查看canal server當前正在工作的節點:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example1/running {"active":true,"address":"192.168.175.20:11111"}
- canal server 自動平滑切換:
先停止正在工作的 192.168.175.20 的 canal server:
bin/stop.sh
這時 192.168.175.22 會立馬啟動example instance,提供新的數據服務:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example1/running {"active":true,"address":"192.168.175.22:11111"}
與此同時,客戶端也會隨着canal server的切換,通過獲取zookeeper中的最新地址,與新的canal server建立鏈接,繼續消費數據,整個過程自動完成。
四、搭建canal client
使用canal client通過zookeeper連接canal server集群。注意運行canal客戶端代碼時,一定要先啟動canal server!!!
1. 代碼實現
- 添加pom依賴:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
- canal client代碼:
package com.xgh.canal; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry.*; import com.alibaba.otter.canal.protocol.Message; import java.net.InetSocketAddress; import java.util.List; public class TestCanalByZk { public static void main(String args[]) { String zkHost="192.168.175.20:2181,192.168.175.21:2181,192.168.175.22:2181"; // 創建鏈接 CanalConnector connector = CanalConnectors.newClusterConnector(zkHost,"example1","",""); /*CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.175.22", 11111), "example", "", "");*/ int batchSize = 1000; int emptyCount = 0; long batchId = 0; //外層死循環:在canal節點宕機后,拋出異常,等待zk對canal處理切換,切換完后,繼續創建連接處理數據 while(true) { try { connector.connect(); connector.subscribe(".*\\..*");//訂閱所有庫下面的所有表 //connector.subscribe("canal.t_canal");//訂閱庫canal庫下的表t_canal connector.rollback(); //內層死循環:按頻率實時監聽數據變化,一旦收到變化數據,立即做消費處理,並ack,考慮消費速度,可以做異步處理並ack. while (true) { Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據 batchId = message.getId(); int size = message.getEntries().size(); //// 偏移量不等於-1 或者 獲取的數據條數不為0 時,認為拿到消息,並處理 if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount);//此時代表當前數據庫無遍更數據 Thread.sleep(200); //200ms拉一次變動數據 } else { emptyCount = 0; System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 } }catch(Exception e){ e.printStackTrace(); connector.rollback(batchId); // 處理失敗, 回滾數據 } finally { connector.disconnect(); } } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } System.out.println("rowChare ======>"+rowChage.toString()); EventType eventType = rowChage.getEventType(); //事件類型,比如insert,update,delete System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(),//mysql的my.cnf配置中的log-bin名稱 entry.getHeader().getLogfileOffset(), //偏移量 entry.getHeader().getSchemaName(),//庫名 entry.getHeader().getTableName(), //表名 eventType));//事件名 for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
canal功能含義:
- 連接,connector.connect() - 訂閱,connector.subscribe - 獲取數據,connector.getWithoutAck() - 業務處理 - 提交確認,connector.ack() - 回滾,connector.rollback() - 斷開連接,connector.disconnect()
canal client運行實例:
empty count : 1 empty count : 2 empty count : 3 empty count : 4
2. 驗證canal client
- 觸發數據庫變更:
創建庫:create database canal;
創建表:create table t_canal (id int,name varchar(20),status int);
插入數據:insert into t_canal values(11,'xxiao',1);
canal client 輸出日志:
================> binlog[mysql-bin.000001:6973] , name[canal,t_canal] , eventType : INSERT id : 11 update=true name : xxiao update=true status : 1 update=true
- 在zk中查看正在連接的 canal client 節點:
[zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example1/1001/running
{"active":true,"address":"192.168.175.18:11111","clientId":1001}
- 在zk中查看最后一次消費成功的binlog位點:
數據消費成功后,canal server會在zookeeper中記錄下當前最后一次消費成功的binlog位點. (下次你重啟client時,會從這最后一個位點繼續進行消費)。
[zk: localhost:2181(CONNECTED) 16] get /otter/canal/destinations/example1/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"10.20.144.15","port":3306}},"postion":{"included":false,"journalName":"mysql-bin.002253","position":2574756,"timestamp":1363688722000}}
五、其它
1. canal數據結構
canal的數據傳輸有兩塊,一塊是進行binlog訂閱時,binlog轉換為我們所定義的Message,第二塊是client與server進行TCP交互時,傳輸的TCP協議。
Entry數據結構:
Entry Header version [協議的版本號,default = 1] logfileName [binlog文件名] logfileOffset [binlog position] serverId [服務端serverId] serverenCode [變更數據的編碼] executeTime [變更數據的執行時間] sourceType [變更數據的來源,default = MYSQL] schemaName [變更數據的schemaname] tableName [變更數據的tablename] eventLength [每個event的長度] eventType [insert/update/delete類型,default = UPDATE] props [預留擴展] gtid [當前事務的gitd] entryType [事務頭BEGIN/事務尾END/數據ROWDATA/HEARTBEAT/GTIDLOG] storeValue [byte數據,可展開,對應的類型為RowChange] RowChange tableId [tableId,由數據庫產生] eventType [數據變更類型,default = UPDATE] isDdl [標識是否是ddl語句,比如create table/drop table] sql [ddl/query的sql語句] rowDatas [具體insert/update/delete的變更數據,可為多條,1個binlog event事件可對應多條變更,比如批處理] beforeColumns [字段信息,增量數據(修改前,刪除前),Column類型的數組] afterColumns [字段信息,增量數據(修改后,新增后),Column類型的數組] props [預留擴展] props [預留擴展] ddlSchemaName [ddl/query的schemaName,會存在跨庫ddl,需要保留執行ddl的當前schemaName] Column index [字段下標] sqlType [jdbc type] name [字段名稱(忽略大小寫),在mysql中是沒有的] isKey [是否為主鍵] updated [是否發生過變更] isNull [值是否為null] props [預留擴展] value [字段值,timestamp,Datetime是一個時間格式的文本] length [對應數據對象原始長度] mysqlType [字段mysql類型]
六、總結
1. 啟動兩個監聽example1的canal client,啟動兩個監聽example2的canal client:
在example1或example2對應的數據發生變化時,兩個canal client只有一個消費消息。
當兩個監聽同一個隊列的canal client有一個宕掉時,再有數據變化時,剩下的一個canal client就會開始消費數據。
這就驗證了canal client的HA機制:為了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操作,否則客戶端接收無法保證有序.
2. 啟動兩個canal server並在zk上注冊
當停掉其中一個canal server時,當產生數據變化時,整個canal server集群仍可以正常對外提供服務。
這就驗證了canal server的HA機制:為了減少對mysql dump的請求,不同server上的instance要求同一時間只能有一個處於running,其他的處於standby狀態.
3. 在canal server切換過程中,canal client存在重復消費數據的問題
這點需要在消費端自行進行處理。
參考: