- Canal 基礎認知
- canal簡介
Canal 譯意為水道/管道/溝渠,主要用途是基於 MySQL 數據庫增量日志解析,提供增量數據訂閱和消費,早期阿里巴巴因為杭州和美國雙機房部署,存在跨機房同步的業務需求,
實現方式主要是基於業務 trigger 獲取增量變更。從 2010 年開始,業務逐步嘗試數據庫日志解析獲取增量變更進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。 基於日志增量訂閱和消費的業務包括 - 數據庫鏡像 - 數據庫實時備份 - 索引構建和實時維護(拆分異構索引、倒排索引等) - 業務 cache 刷新 - 帶業務邏輯的增量數據處理 當前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x - canal原理
- MySQL主備復制原理
MySQL master(主)將數據變更寫入二進制日志( binary log, 其中記錄叫做二進制日志事件binary log events,可以通過 show binlog events 進行查看) MySQL slave(從)將 master 的 binary log events 拷貝到它的中繼日志(relay log) MySQL slave 重放 relay log 中事件,將數據變更反映它自己的數據
- Canal工作原理
canal 模擬 MySQL slave 的交互協議,偽裝自己為 MySQL slave ,向 MySQL master 發送dump 協議 MySQL master 收到 dump 請求,開始推送 binary log 給 slave (即 canal) canal 解析 binary log 對象(原始為 byte 流),用戶使用canal客戶端對數據進行進一步處理
- MySQL主備復制原理
- Canal官方文檔
- Canal源代碼
- Canal架構簡單說明
個人認知: 一個Canal Sever服務器中可以包含一個或者多個Instance(實例|目的地),每一個Instance就是一個任務處理線程,用於從不同的MySQL服務器或者同一個MySQL的
不同數據庫監聽收集binlog日志(canal監聽mysql的數據配置在example/instance.properties文件中),canal默認提供一個example實例,
如果需要多個實例在conf中創建一個文件夾instanceA,復制example/instance.properties文件到instanceA中並修改,在canal.properties文件中設置destinations時逗號分割,
如果沒有設置則實例無效- Canal安裝包文件目錄
conf/ ├── canal_local.properties ├── canal.properties ├── example │ ├── h2.mv.db │ ├── instance.properties │ └── meta.dat ├── logback.xml ├── metrics │ └── Canal_instances_tmpl.json └── spring ├── base-instance.xml ├── default-instance.xml ├── file-instance.xml ├── group-instance.xml ├── memory-instance.xml └── tsdb ├── h2-tsdb.xml ├── mysql-tsdb.xml ├── sql │ └── create_table.sql └── sql-map ├── sqlmap-config.xml ├── sqlmap_history.xml └── sqlmap_snapshot.xml
- 重要配置文件說明
- canal.properties
canal.properties這個配置文件負責的是canal服務的基礎配置,每個canal可以啟動多個實例instance,一個instance代表一個數據采集處理線程,
每個instance都有一個獨立的配置文件instance.properties,不同的instance可以采集不同的mysql數據庫,也就是一個canal可以對應多個mysql數據庫。在instance里面有一個小隊列,可以理解為是jvm級的隊列,instance抓取來的數據先放入到隊列中,隊列可以有很多出口: ①mq模式:canal server自己主動把數據推送到kafka,這個比較簡單,一行代碼不用寫,只需要配個kafka|rocketmq|rabbitmq的地址, 每個instance對應kafka的一個topic,數據是json串。 這種方式雖然簡單,但是其缺點主要體現在兩個個方面,一個instance對應一個topic,所有表都在這一個topic,所以實時的時候要進行分流。 另一方面,因為數據是json,並且攜帶了很多冗余信息,但是數據量大的時候傳輸效率比較低。 ②tcp模式:啟動canal客戶端連接到canal服務器端之后,主動去拉取數據,可以定義多長周期消費多少數據。缺點在於抓取出來的是序列化壓縮的數據, 所以需要反序列化,解壓,比較麻煩。優點在於我們可以進行壓縮,過濾掉沒用的冗余信息,只保留我們需要的信息,提交傳輸效率。
########################################################## common argument ############################################################### tcp bind ip |canal server綁定的本地IP信息,如果不配置,默認選擇一個本機IP進行啟動服務canal.ip =# register ip to zookeeper |canal server注冊到外部zookeeper、admin的ip信息 (針對docker的外部可見ip)canal.register.ip =# canal server提供socket服務的端口,端口號,是給tcp模式(netty)時候用的,如果用了kafka或者rocketmq,就不需要配置這個端口canal.port = 11111canal.metrics.pull.port = 11112# canal instance user/passwd canal數據端口訂閱的ACL配置 (v1.1.4新增)如果為空,代表不開啟#canal.user = canal# canal數據端口訂閱的ACL配置 (v1.1.4新增)如果為空,代表不開啟#canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458# canal admin config|canal鏈接canal-admin的地址 (v1.1.4新增)canal.admin.manager = 127.0.0.1:8089# admin管理指令鏈接端口 (v1.1.4新增)canal.admin.port = 11110# admin管理指令鏈接的ACL配置 (v1.1.4新增)canal.admin.user = admin# admin管理指令鏈接的ACL配置 (v1.1.4新增)canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441# canal server鏈接zookeeper集群的鏈接信息,單機部署不需要配置,集群部署需要配置canal.zkServers =# flush data to zk| canal持久化數據到zookeeper上的更新頻率,單位毫秒canal.zookeeper.flush.period = 1000canal.withoutNetty = false# tcp, kafka, RocketMQ| Canal模式設定,主要有tcp和mq兩種類型canal.serverMode = tcp# flush meta cursor/parse position to file|主要針對h2-tsdb.xml時對應h2文件的存放目錄,默認為conf/xx/h2.mv.dbcanal.file.data.dir = ${canal.conf.dir}canal.file.flush.period = 1000## memory store RingBuffer size, should be Math.pow(2,n) |canal內存store中可緩存buffer記錄數,需要為2的指數canal.instance.memory.buffer.size = 16384## memory store RingBuffer used memory unit size , default 1kb | 內存記錄的單位大小,默認1KB,和buffer.size組合決定最終的內存使用大小canal.instance.memory.buffer.memunit = 1024## meory store gets mode used MEMSIZE or ITEMSIZE | canal內存store中數據緩存模式## 1. ITEMSIZE : 根據buffer.size進行限制,只限制記錄的數量## 2. MEMSIZE : 根據buffer.size * buffer.memunit的大小,限制緩存記錄的大小canal.instance.memory.batch.mode = MEMSIZEcanal.instance.memory.rawEntry = true## detecing config |是否開啟心跳檢查canal.instance.detecting.enable = false# 心跳檢查sql#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()canal.instance.detecting.sql = select 1# 心跳檢查頻率,單位秒canal.instance.detecting.interval.time = 3# 心跳檢查失敗重試次數canal.instance.detecting.retry.threshold = 3# 心跳檢查失敗后,是否開啟自動mysql自動切換# 說明:比如心跳檢查失敗超過閥值后,如果該配置為true,canal就會自動鏈到mysql備庫獲取binlog數據canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions deliverycanal.instance.transaction.size = 1024# mysql fallback connected to new master should fallback times# canal發生mysql切換時,在新的mysql庫上查找binlog時需要往前查找的時間,單位秒# 說明:mysql主備庫可能存在解析延遲或者時鍾不統一,需要回退一段時間,保證數據不丟canal.instance.fallbackIntervalInSeconds = 60# network config# 網絡鏈接參數,SocketOptions.SO_RCVBUFcanal.instance.network.receiveBufferSize = 16384# 網絡鏈接參數,SocketOptions.SO_SNDBUFcanal.instance.network.sendBufferSize = 16384# 網絡鏈接參數,SocketOptions.SO_TIMEOUTcanal.instance.network.soTimeout = 30# binlog filter config
# 是否使用druid處理所有的ddl解析來獲取庫和表名
canal.instance.filter.druid.ddl = true
# 是否忽略dcl語句
canal.instance.filter.query.dcl = false
# 是否忽略dml語句(mysql5.6之后,在row模式下每條DML語句也會記錄SQL到binlog中,可參考MySQL文檔)
canal.instance.filter.query.dml = false
# 是否忽略ddl語句
canal.instance.filter.query.ddl = false
# 是否忽略binlog表結構獲取失敗的異常(主要解決回溯binlog時,對應表已被刪除或者表結構和binlog不一致的情況)
canal.instance.filter.table.error = false
# 是否dml的數據變更事件(主要針對用戶只訂閱ddl/dcl的操作)
canal.instance.filter.rows = false
# 是否忽略事務頭和尾,比如針對寫入kakfa的消息時,不需要寫入TransactionBegin/Transactionend事件
canal.instance.filter.transaction.entry = false# binlog format/image check
# 支持的binlog format格式列表(otter會有支持format格式限制)
canal.instance.binlog.format = ROW,STATEMENT,MIXED
# 支持的binlog image格式列表(otter會有支持format格式限制)
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolation
# ddl語句是否單獨一個batch返回(比如下游dml/ddl如果做batch內無序並發處理,會導致結構不一致)
canal.instance.get.ddl.isolation = false# parallel parser config
# 是否開啟binlog並行解析模式
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2| binlog並行解析的異步ringbuffer隊列(必須為2的指數)
canal.instance.parser.parallelBufferSize = 256# table meta tsdb info
# 是否開啟tablemeta的tsdb能力
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360#################################################
######### destinations #############
#################################################
# 當前server上部署的instance列表,多個使用逗號分隔
canal.destinations =
# conf root dir | conf/目錄所在的路徑
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance | 開啟instance自動掃描
# 如果配置為true,canal.conf.dir目錄下的instance配置變化會自動觸發
canal.auto.scan = true
# instance自動掃描的間隔時間,單位秒
canal.auto.scan.interval = 5
# v1.0.25版本新增,全局的tsdb配置方式的組件文件
canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
# 全局配置加載方式
canal.instance.global.mode = manager
# 全局lazy模式
canal.instance.global.lazy = false
# 全局的manager配置方式的鏈接信息
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
# 全局的spring配置方式的組件文件
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml##################################################
######### MQ #############
##################################################
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092
canal.mq.servers = 127.0.0.1:6667
# 發送失敗重試次數
canal.mq.retries = 0
# kafka為ProducerConfig.BATCH_SIZE_CONFIG,每次發送批量消息的個數
canal.mq.batchSize = 16384
# kafka為ProducerConfig.MAX_REQUEST_SIZE_CONFIG
canal.mq.maxRequestSize = 1048576
# kafka為ProducerConfig.LINGER_MS_CONFIG , 如果是flatMessage格式建議將該值調大, 如: 200
canal.mq.lingerMs = 100
# kafka為ProducerConfig.BUFFER_MEMORY_CONFIG
canal.mq.bufferMemory = 33554432
# 獲取canal數據的批次大小
canal.mq.canalBatchSize = 50
# 獲取canal數據的超時時間
canal.mq.canalGetTimeout = 100
# 是否為json格式
canal.mq.flatMessage = true
canal.mq.compressionType = none
# kafka為ProducerConfig.ACKS_CONFIG
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test - canl_local.properties
# 主要定義了本地webUI # register ip canal.register.ip = # canal admin config canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = canal.admin.register.name =
- example/instance.properties
一個example的目錄就是一個instance,canal要配置多個實例采集多個數據源mysql的話如下配置,然后把conf目錄下example復制多份,分別重命名,如下########################################################## destinations ##############################################################canal.destinations = example1,example2,example3————————————————################################################### mysql serverId , v1.0.26+ will autoGen|作為Mysql從節點的id,1.0.26版本之后會自動生成# canal.instance.mysql.slaveId=0# enable gtid use true/false | 是否啟用mysql gtid的訂閱模式canal.instance.gtidon=false# position info# mysql主庫鏈接地址canal.instance.master.address=rm-qj0lin9gpm5hn841b.mysql.rds.cnipaig2.cloud:3306# mysql主庫鏈接時起始的binlog文件canal.instance.master.journal.name=# mysql主庫鏈接時起始的binlog偏移量canal.instance.master.position=# mysql主庫鏈接時起始的binlog的時間戳canal.instance.master.timestamp=# mysql主庫鏈接時對應的gtid位點canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=fasle#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb#canal.instance.tsdb.dbUsername=canal#canal.instance.tsdb.dbPassword=canal#canal.instance.standby.address =#canal.instance.standby.journal.name =#canal.instance.standby.position =#canal.instance.standby.timestamp =#canal.instance.standby.gtid=# username/password# mysql數據庫帳號canal.instance.dbUsername=rds_test# mysql數據庫密碼canal.instance.dbPassword=rds_test123canal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==# table regex| 需要采集的MySQL表過濾匹配表達式canal.instance.filter.regex=.*\\..*# table black regex 數據表黑名單過濾匹配表達式canal.instance.filter.black.regex=# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config | mq的topic信息canal.mq.topic=drds# dynamic topic route by schema or table regex|是否每張表動態匹配一個topic#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*# mq的分區數量canal.mq.partition=0# hash partition config | 散列模式的分區數#canal.mq.partitionsNum=3#canal.mq.partitionHash=test.table:id^name,.*\\..*#################################################
- canal.properties
- Canal安裝包文件目錄
- canal簡介
- Canal使用前提條件--安裝MySQL+開啟binlog支持(Mac)
- 下載Mac版本MySQL安裝包
mysql dmg安裝包下載地址,選擇版本和系統版本下載 https://downloads.mysql.com/archives/community/
- 安裝dmg安裝包
需要注意在最后一步安裝會生成一個root用戶秘密,需要記錄下來
- 設置環境變量
vim /etc/profile export PATH=$PATH:/usr/local/mysql/bin vim ~/.bash_profile export PATH=$PATH:/usr/local/mysql/bin source /etc/profile source ~/.bash_profile
- 啟動MySQL
apple---系統便好設置---mysql----啟動mysql
- 修改root用戶密碼+創建canal用戶
mysql -u root -p’你剛才的密碼’ alter user root@localhost identified by 'root'; GRANT ALL PRIVILEGES ON *.* TO 'root'@'%' IDENTIFIED BY '123456' ; -- canal的原理是模擬自己為mysql slave,所以這里一定需要做為mysql slave的相關權限 CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%' IDENTIFIED BY 'canal'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; flush privileges; -- 針對已有的賬戶可通過grants查詢權限: show grants for 'canal' ;
- 開啟MySQL的binlog日志
-- canal的原理是基於mysql binlog技術,所以這里一定需要開啟mysql的binlog寫入功能,並且配置binlog模式為row. vim /etc/my.cnf [mysqld] default-storage-engine=INNODB character-set-server=utf8 log-bin = mysql-bin binlog-format = ROW server_id = 1 port = 3306 [client] default-character-set=utf8 -- 重啟mysql--同啟動步驟 -- 驗證binlog是否開啟 mysql> show variables like 'binlog_format'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | binlog_format | ROW | +---------------+-------+ mysql> show variables like 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+
- 下載Mac版本MySQL安裝包
-
單機部署Canal(Mac)
- 下載安裝包
cd /data/app wget https://github.com/alibaba/canal/releases/download/canal-1.1.15/canal.deployer-1.1.15.tar.gz
- 解壓安裝包
cd /data/app mkdir canal-server-1.1.15 tar zxvf canal.deployer-$version.tar.gz -C canal-server-1.1.15
- 使用tcp模式-客戶端主動拉取模式
- 配置文件
- conf.canal.properties
################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 ################################################# ######### destinations ############# ################################################# canal.destinations = example # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 # set this value to 'true' means that when binlog pos not found, skip to latest. # WARN: pls keep 'false' in production env, or if you know what you want. canal.auto.reset.latest.pos.mode = false # tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = tcp
- conf/example/instance.properties
################################################# ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=1234 # position info canal.instance.master.address=127.0.0.1:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex #canal.instance.filter.regex=.*\\..* canal.instance.filter.regex=douc_0827\\..*,dosm_activiti_0827\\..*
- conf.canal.properties
- 啟動canal
cd /data/app/canal-server-1.1.15 ./bin/startup.sh
- 查看日志
canal服務器日志文件目錄 cd /data/app/canal-server-1.1.15/logs/canal example實例日志文件目錄 cd /data/app/canal-server-1.1.15/logs/example
- 項目搭建與測試
- pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spring-boot-cancal</artifactId> <groupId>com.example</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion> <artifactId>canal-use-demo</artifactId> <dependencies> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-to-slf4j</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.48</version> <scope>runtime</scope> </dependency> <!--引入canal依賴--> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.common</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.protocol</artifactId> <version>1.1.5</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>druid</artifactId> <version>1.2.6</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.1.1</version> </dependency> </dependencies> </project>
- 工具類
public class CanalUtils { /** * 獲取canal連接器 */ public static CanalConnector getSingleCanalConnector(String host,int port,String desc,String user,String password){ CanalConnector connector = CanalConnectors.newSingleConnector( new InetSocketAddress(host,port), desc, user, password); return connector; } /** * 獲取集群canal連接器 */ public static CanalConnector getClusterCanalConnector(List<? extends SocketAddress> addresses, String desc, String user, String password){ CanalConnector connector = CanalConnectors.newClusterConnector(addresses, desc, user, password); return connector; } public static CanalConnector getClusterCanalConnectorV2(List<Map<String,String>> adrs, String desc, String user, String password){ List<InetSocketAddress> addresses = new ArrayList<>(); adrs.forEach(en ->{ String host = en.get("host"); String port = en.get("port"); addresses.add(new InetSocketAddress(host,Integer.parseInt(port))); }); CanalConnector connector = CanalConnectors.newClusterConnector(addresses, desc, user, password); return connector; } /** * 獲取由zookeeper組成的canal集群鏈接 */ public static CanalConnector getClusterCanalConnector(String zkHost, String desc, String user, String password){ CanalConnector connector = CanalConnectors.newClusterConnector(zkHost, desc, user, password); return connector; } public static CanalConnector getLocalCanalConnector(){ // return getCanalConnector(AddressUtils.getHostIp(), 11111, "example", "", ""); return getSingleCanalConnector("127.0.0.1", 11111, "example", "", ""); } /** * 獲取MySQL連接 */ public static Connection getMySqlConnnection(String url,String userName,String password){ Connection connection = null; try{ Class.forName("com.mysql.jdbc.Driver"); connection = DriverManager.getConnection(url,userName,password); }catch (Exception e){ System.out.println(e.getLocalizedMessage()); } return connection; } public static boolean execute(Connection conn,String sql){ try(PreparedStatement statement = conn.prepareStatement(sql)){ return statement.execute(); }catch (Exception e){ System.out.println(e.getLocalizedMessage()); } return false; } /** * 獲取數據連接池 */ public static DataSource getDataSource(String url,String user,String psd){ DruidDataSource dataSource = new DruidDataSource(); // 2、為數據源實例指定必須的屬性 dataSource.setUsername(user); dataSource.setPassword(psd); dataSource.setUrl(url); dataSource.setDriverClassName("com.mysql.jdbc.Driver"); // 3、指定數據源的一些可選屬性 // 3.1、指定數據庫連接池中出使化連接數的個數 dataSource.setInitialSize(5); // 3.2、指定最大連接數:同意時刻可以同時向數據庫申請的連接數 dataSource.setMaxActive(20); // 3.3、指定最小連接數:在數據庫連接池中保存的最少的空閑連接的數量 dataSource.setMinIdle(2); // 3.4、等待數據庫連接池分配連接的最長時間。單位為毫秒。超出時間將拋出異常 dataSource.setMaxWait(1000 * 5); // 4、從數據源中獲取數據庫連接 return dataSource; } /** * 數據連接池獲取鏈接 */ public static Connection getConnFromPool(DataSource dataSource) throws Exception{ return dataSource.getConnection(); } }
- 測試類
public class SimpleCanalClientExample { public static void main(String[] args) { // 創建鏈接 CanalConnector connector = CanalUtils.getLocalCanalConnector(); int batchSize = 1000; int emptyCount = 0; int dataCount = 0; try { // 打開鏈接 connector.connect(); // todo 訂閱數據庫表,全部表 // connector.subscribe(".*\\..*"); /** * todo 針對cw_dodb,abc數據庫表數據操作都會追逐,其他數據庫級別的操作(創建|修改)也會追蹤,針對其他庫中表操作不會記錄 * -- 方式一: 在服務器/canal-server-1.1.5/conf/example/instance.properties 進行如下配置也可以 * canal.instance.filter.regex=cw_dodb\\..*,abc\\..* * 在代碼中使用服務器中默認配置的,也就是不重新調用subscribe方法 * -- 方式二: 代碼中覆蓋 connector.subscribe("cw_dodb\\..*,abc\\..*"); * */ // connector.subscribe("cw_dodb\\..*,abc\\..*"); connector.subscribe("a1\\..*"); // 回滾到為進行ack的地方,下次fetch的時候,可以從最后一個沒有ack的地方開始獲取 connector.rollback(); while (true) { // 獲取指定數量的數據 Message message = connector.getWithoutAck(batchSize); // 獲取批量ID long batchId = message.getId(); // 獲取批量的數量 int size = message.getEntries().size(); // 如果沒有數據的處理邏輯 if (batchId == -1 || size == 0) { emptyCount++; // System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { // 有數據的處理emptyCount = 0; dataCount++; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } // 提交確認,標記ack connector.ack(batchId); System.out.println("---handled data counts: "+dataCount); // 處理失敗, 回滾數據 // connector.rollback(batchId); } } finally { connector.disconnect(); } } /** * 打印canal server解析binlog獲取的實體類信息 * @param entrys */ private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { // 開啟/關閉事務的實體類型,跳過 continue; } /** * RowChange對象,包含了一行數據變化的所有特征 * 比如isDdl 是否是ddl變更操作 sql 具體的ddl sql beforeColumns afterColumns 變更前后的數據字段等等 */ RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } // 獲取針對Mysql數據操作類型:insert/update/delete EventType eventType = rowChage.getEventType(); // 打印Header信息 System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); // 判斷是否是DDL語句 if(rowChage.getIsDdl()){ System.out.println("================》;isDdl: true,sql: " + rowChage.getSql()); } List<RowData> rowDatasList = rowChage.getRowDatasList(); if(!CollectionUtils.isEmpty(rowDatasList)) { // todo 如果不是DDL的話獲取不到原始sql語句,只能通過數據組裝成SQL String sql = rowChage.getSql(); System.out.println("------sql--------: "+sql); // 獲取RowChange對象里的每一行數據,打印出來 for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { System.out.println("----------------delete:----------------"); printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { System.out.println("-----------------insert:-----------------"); printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println("\t"+column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
- pom.xml
- 項目運行與MySQL操作測試
- 配置文件
- 使用mq模式-服務端主動推送模式
- 配置文件
- conf/canal.propertie
# tcp, kafka, rocketMQ, rabbitMQ canal.serverMode = kafka ################################################## ######### Kafka ############# ################################################## kafka.bootstrap.servers = 127.0.0.1:9092 kafka.acks = all kafka.compression.type = none kafka.batch.size = 16384 kafka.linger.ms = 1 kafka.max.request.size = 1048576 kafka.buffer.memory = 33554432 kafka.max.in.flight.requests.per.connection = 1 kafka.retries = 0
-
conf/canal.properties
# mq config canal.mq.topic=canal-binlog # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.dynamicTopic= canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
- conf/canal.propertie
- 啟動canal
cd /data/app/canal-server-1.1.15 ./bin/startup.sh
- MySQL操作+件套Kafka消息
- 配置文件
- 下載安裝包
- 集群部署Canal(Linux-host01,host02)
- zookeeper安裝
- canal集群部署與測試
- canal集群工作原理
一個canal服務器進程,每一個instance就是一個線程,單獨對應一個mysql服務器的binlog。再起一個canal服務的話,對於同一個mysql服務器不能做負載均衡,數據分片等。有兩個canal服務器 都監控一個或多個mysql服務器的binlog。 這兩個canal服務同時只能有一個提供服務,當提供服務的這個宕機時,zookeeper能知道,zookeeper就通知另一個canal服務器讓他提供服務。當原來宕機的那個再啟動起來時,是搶占模式的, 誰搶到就誰上,沒搶到就standy模式。 canal本身就是一個工具不存數據,宕機了就宕機,只有還有另外一個能提供服務就行,所以沒有什么同步問題(不像數據庫有同步問題)。因為啟動canal服務是需要消耗資源的,不想redis高可用 占資源太少了。canal的standy資源也不能給少了,要雙份資源,就看企業在意不在意,服務核心不核心。 maxwell和canal非常像,maxwell連高可用機制都沒提供,倒了就再起。其實很多軟件都是不提供高可用方案的,如果怕他倒的話,可以用Keepalived,這個機制很簡單就是做心跳監測,可以給任何 進程做一個心跳檢測,可以一直檢測他在不在進程列表里,如果宕了進程沒了他會有一系列觸發操作,可以在他里面寫一個shell,如還有一個備機,要是這個掛了就在備機啟動。或者自己手工在restart, 這是一種通用型方案。Keepalived和maxwell是完全沒有耦合關系的,maxwell完全不知道Keepalived的存在,Keepalived是從外圍的觀察者觀察這個進程,不像zookeeper,是需要向它注冊的。 注意:這里zookeeper為觀察者監控的模式,只能實現高可用,而不是負載均衡, 即同一時點只有一個canal-server節點能夠監控某個數據源,只要這個節點能夠正常工作, 那么其他監控這個數據源的canal-server只能做stand-by,直到工作節點停掉,其他canal-server節點才能搶占。
- canal服務器配置文件修改
======= host01 # 修改配置信息 cd /data/app/canal-server-1.1.5 rm -rf conf/example/meta.dat rm -rf conf/example/h2* cd conf vim canal.properties ################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper 每個實例推薦使用不同的ip canal.register.ip = 10.1.1.2 canal.port = 11111 canal.metrics.pull.port = 11112 canal.zkServers =127.0.0.1:2181 ====== host02 # 進行復制和修改 cd /data/app/canal-server-1.1.5 rm -rf conf/example/meta.dat rm -rf conf/example/h2* cd conf vim canal.properties ################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper 每個實例推薦使用不同的ip canal.register.ip = 10.1.1.3 canal.port = 11111 canal.metrics.pull.port = 11112 canal.zkServers =127.0.0.1:2181 vim example/instance.properties canal.instance.mysql.slaveId=1235
- zookeeper啟動+服務狀態檢查
1. 啟動zookeeper [root@localhost data]# zkServer.sh start ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-3.4.13/bin/../conf/zoo.cfg Starting zookeeper ... STARTED 2. 啟動狀態檢查 [root@VM-16-13-centos zookeeper-3.4.13]# zkServer.sh status ZooKeeper JMX enabled by default Using config: /usr/local/zookeeper-3.4.13/bin/../conf/zoo.cfg Mode: standalone
- canal集群啟動
-- host01,host02上分別執行
cd /data/app/canal-server-1.1.5 ./bin/startup.sh結合如上截圖中,兩個canal服務器節點通過zookeeper組成集群,兩個節點中的instance相同監聽同一個MySQL的數據信息,當集群啟動時發現最初啟動節點的logs目錄下存在實例日志文件夾 example,另外一個節點logs目錄下卻沒有,符合canal集群高可用原則,一個處於active一個處於standy狀態 - canal集群高可用驗證
- 在節點10.177.146.33(active)上關閉canal
- 執行MySQL數據操作,發現canal的kafka依舊有數據寫入
- 查看另外一個節點10.177.146.32(standy),發現logs下出現example實例日志
- 在節點10.177.146.33上刪除logs下所有文件,重啟canal之后發現logs目錄下未生成example實例日志
- 總結: 該過程中,active狀態節點流轉為33-->32
- 在節點10.177.146.33(active)上關閉canal
- canal集群工作原理
- Canal擴展(業務需要)
- 需求說明
-- 使用canal轉發收集到的binlog日志到kafka,使用的是canal自定義的JSON數據格式,如下所示 { "data":[ { "a":"ee" } ], "database":"abc", "es":1637578296000, "id":19, "isDdl":false, "mysqlType":{ "a":"varchar(100)" }, "old":null, "pkNames":null, "sql":"", "sqlType":{ "a":12 }, "table":"abc", "ts":1637578297135, "type":"INSERT" } -- 為了快速接入Dodb,考慮到Dodb有現有功能--Kafka數據源(帶存儲型),可以將kafka中數據映射到Dodb的數據表ClickHouse表中 -- 考慮到Clickhouse的update,delete性能比較低,將所有的update,delete操作轉換為insert操作 新增倆個字段version(DateTime),sign(Int),version使用插入時間毫秒數,sign更新|插入時使用1,刪除時使用-1,不過后期需要考慮clickhouse數據去重 { "number":"1", "commodity_price":"1.0", "sign":1, "description":"e8023b930e384692978af8a7af4ea6f6", "id":"d26eea16687a40e2be925cae162c9bd3", "version":1637560526296, "commodity_name":"2" } { "number":"1", "commodity_price":"1.0", "sign":-1, "description":"e8023b930e384692978af8a7af4ea6f6", "id":"d26eea16687a40e2be925cae162c9bd3", "version":1637560726488, "commodity_name":"2" } -- 將收集的不同表中binlog變更數據,分發到對應的各自topic中,之后在dodb中創建對應的Kafka存儲型 數據源中,將數據寫入Clickhouse數據表
- 擴展思路一
binlog(MySQL)->canal server->kafka producer(Canal內嵌)->kafka consumer(擴展)->kafka producer(擴展)->dodb kafka存儲型數據源->ClickHouse -- kafka consumer(擴展項目程序中) 1. 過濾符合條件的數據,根據配置的數據庫名稱+數據表名稱 2. 數據轉換,將新增,刪除,更新操作統一轉換為新增操作,使用字段version-Long,sign—Int作為標識 3. 數據分發,針對采集的表數據,經過轉換后每張表數據轉發至對應的單個topic中 -- kafka producer(擴展項目程序中) 4. 完成數據轉發操作
- canal原生內嵌kafka發送數據樣例
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic canal-binlog --property print.key=true null {"data":[{"id":"aba1b95e24dd4434909177c2c14f3020","commodity_name":"1","commodity_price":"3.0","number":"3","description":"79d9782c3d174c918fe64ef66d1bc039"}],"database":"abc","es":1637560518000,"id":5,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"double","number":"int(10)","description":"varchar(2048)"},"old":[{"commodity_name":"desc1"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":8,"number":4,"description":12},"table":"abc","ts":1637560518949,"type":"UPDATE"} null {"data":[{"id":"d26eea16687a40e2be925cae162c9bd3","commodity_name":"2","commodity_price":"1.0","number":"1","description":"e8023b930e384692978af8a7af4ea6f6"}],"database":"abc","es":1637560526000,"id":6,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"double","number":"int(10)","description":"varchar(2048)"},"old":[{"commodity_name":"desc1"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":8,"number":4,"description":12},"table":"abc","ts":1637560526294,"type":"UPDATE"} null {"data":[{"id":"d26eea16687a40e2be925cae162c9bd3","commodity_name":"2","commodity_price":"1.0","number":"1","description":"e8023b930e384692978af8a7af4ea6f6"}],"database":"abc","es":1637560726000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"double","number":"int(10)","description":"varchar(2048)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":8,"number":4,"description":12},"table":"abc","ts":1637560726484,"type":"DELETE"} null {"data":[{"id":"wwwwww","commodity_name":"1","commodity_price":"0.0","number":"0","description":"wwwww"}],"database":"abc","es":1637560787000,"id":8,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"double","number":"int(10)","description":"varchar(2048)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":8,"number":4,"description":12},"table":"abc","ts":1637560787488,"type":"INSERT"}
- 添加擴展程序后kafka發送數據樣例
-- topic 為指定表名稱+特定配置的前后綴組成 kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic mysql_abc_topic --property print.key=true null {"number":"3","commodity_price":"3.0","sign":1,"description":"79d9782c3d174c918fe64ef66d1bc039","id":"aba1b95e24dd4434909177c2c14f3020","version":1637560518952,"commodity_name":"1"} null {"number":"1","commodity_price":"1.0","sign":1,"description":"e8023b930e384692978af8a7af4ea6f6","id":"d26eea16687a40e2be925cae162c9bd3","version":1637560526296,"commodity_name":"2"} null {"number":"1","commodity_price":"1.0","sign":-1,"description":"e8023b930e384692978af8a7af4ea6f6","id":"d26eea16687a40e2be925cae162c9bd3","version":1637560726488,"commodity_name":"2"} null {"number":"0","commodity_price":"0.0","sign":1,"description":"wwwww","id":"wwwwww","version":1637560787492,"commodity_name":"1"}
- 擴展代碼開發
- pom.xml
<dependencies> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.15</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.11</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> </dependency> </dependencies> <build> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>2.0.2</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>utf-8</encoding> </configuration> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> </plugin> </plugins> </build>
- 靜態變量類
public class Constants { /**標記字段 1-更新|新增 -1-刪除*/ public static final String SIGN = "sign"; /** 聚合處理字段*/ public static final String VERSION = "version"; public static final Integer ADD_UPDATE = 1; public static final Integer DELETE = -1; /** * 目的數據topic * -- 數據處理完畢之后發送數據的topic * -- 采用每張表的一個topic */ public static final String TOPIC_PREFIX = "mysql_"; public static final String TOPIC_SUFFIX = "_topic"; /**canal轉發的kafka消息消費組i*/ public static final String CONSUMER_GROUP_NAME = "from_canal_kafka"; }
- 配置文件
/** * @version 1.0.0 * @ClassName ReWrapperConfig.java * @Description TODO * canal.properties配置文件中添加如下參數 #此參數包含需要采集的數據庫.數據表名 custom.async.mysql.db_table.list=douc.dosm_data_dictionary,douc.tab,abc.abc,abc.abcd # 消費canal轉發至kafka的binlog消息,進行數據組裝+拆分之后發送最終kafka的topic的>前后綴 # 注意: 處理后的每個張表的數據發送往 prefix_table_suffix topic且增刪改操作都處>理為增,使用sign,version區分 custom.async.kafka.topic.prefix = mysql_ custom.async.kafka.topic.suffix = _topic */ @Data @Slf4j public class ReWrapperConfig { /**源數據topic -- 對標 canal.mq.topic*/ private String sourceTopic; /**kafka地址 -- 對標 kafka.bootstrap.servers*/ private String kafkaServer; private Thread thread; private KafkaConsumer<String,String> consumer; private KafkaProducer<String, String> producer; /** * 上面將數據庫和數據庫和數據表拆分為兩個參數是有問題的 * -- 此參數包含數據庫.數據表名 * -- canal.properties中 新增字段custom.async.mysql.db_table.list */ private String mysqlDbTables; private List<String> dbTables; /** * 解析之后每張表數據采集發送到一個topic中 * -- dodb創建kafka存儲性的數據源,映射到對應的表中 * -- canal.properties中 新增字段custom.async.kafka.topic.prefix * -- canal.properties中 新增字段custom.async.kafka.topic.suffix */ private String topic_prefix; private String topic_suffix;
public void init(){ consumer = consumerFromCanalOrinal(); producer = kafkaProducer(); KafkaReWrapperHandler kafkaReWrapperHandler = new KafkaReWrapperHandler(consumer,producer,this); thread = new Thread(kafkaReWrapperHandler); thread.start(); } public void destory(){ if(null != consumer){ consumer.close();} if(null != producer){producer.close(); } if(null != null && !thread.isInterrupted()){ thread.interrupt();} } /** * canal轉發的kafka消息消費者 * @return */ public KafkaConsumer<String,String> consumerFromCanalOrinal(){ Properties props = new Properties(); // 定義kakfa 服務的地址,不需要將所有broker指定上 props.put("bootstrap.servers", kafkaServer); // 制定consumer group props.put("group.id",CONSUMER_GROUP_NAME); // 是否自動確認offset props.put("enable.auto.commit", "false"); // 自動確認offset的時間間隔 props.put("auto.commit.interval.ms", "1000"); // key的反序列化類 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的反序列化類 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("session.timeout.ms", "60000"); props.put("max.poll.interval.ms", "50000"); props.put("max.poll.records", "5000"); props.put("auto.offset.reset", "latest"); props.put("key.deserializer.encoding", "UTF8"); props.put("value.deserializer.encoding", "UTF8"); props.put("max.partition.fetch.bytes", 209715200); props.put("receive.buffer.bytes", 1024 * 1024 * 10); props.put("fetch.min.bytes", 300); KafkaConsumer<String, String> stringStringKafkaConsumer = new KafkaConsumer<>(props); stringStringKafkaConsumer.subscribe(Arrays.asList(sourceTopic)); return stringStringKafkaConsumer; } /** * 重新處理之后發送Kafka的生產者 * -- 數據來源於初始canal轉kafka的binlog數據 * -- 數據進行處理 * -- 發送給各個表對應的topic * @return */ public KafkaProducer<String, String> kafkaProducer() { Properties props = new Properties(); props.put("bootstrap.servers", kafkaServer); /*ack方式,all,會等所有的commit最慢的方式; 1:由Leader確認 如果不是集群部署模式需要將該行代碼注釋掉,否則報錯找不到當前topic可用的Leader*/ //props.put("acks", "1"); // 失敗是否重試,設置會有可能產生重復數據 props.put("retries", 0); // 對於每個partition的batch buffer大小 props.put("batch.size", 16384); // 等多久,如果buffer沒滿,比如設為1,即消息發送會多1ms的延遲,如果buffer沒滿 props.put("linger.ms", 1); // 整個producer可以用於buffer的內存大小 props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); /*cananl內部存在一個kafka的生產者,需要額外配置該參數,否則將報錯 WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=DemoProducer*/ props.put("client.id","9999"); return new KafkaProducer<>(props); } /** * 最終數據發送的topic * @param table * @return */ public String getKafkaTopic(String table){ if(Objects.nonNull(topic_prefix) && Objects.nonNull(topic_suffix)){ return topic_prefix+table+topic_suffix; } return TOPIC_PREFIX+table+TOPIC_SUFFIX; } /** * 最新的校驗是否是采集的數據 * -- 所有的配置數據從配置文件canal.properties文件中獲取 * -- 禁止從常量類Constants中獲取 * @param db 數據庫名 * @param table 數據表名 * @return */ public boolean isCollectedTable(String db,String table){ if(Objects.isNull(mysqlDbTables)){ return false; } String dt = db+"."+table; return getDbTables().contains(dt); } private List<String> getDbTables(){ if(Objects.isNull(dbTables)){ if(Objects.isNull(mysqlDbTables)){ return dbTables = new ArrayList<>(); } dbTables = Arrays.asList(mysqlDbTables.split(",")) .stream() .distinct() .collect(Collectors.toList()); } return dbTables; } } - 處理類
@Data @Slf4j public class KafkaReWrapperHandler implements Runnable{ private KafkaConsumer<String,String> consumer; private KafkaProducer<String,String> producer; private ReWrapperConfig config; public KafkaReWrapperHandler(KafkaConsumer<String,String> consumer,KafkaProducer<String,String> producer,ReWrapperConfig config){ this.consumer = consumer; this.producer = producer; this.config = config; } @Override public void run() { log.error("------啟動KafkaConsumer2---------"); while (!Thread.currentThread().isInterrupted()) { ConsumerRecords<String, String> consumerRecords = consumer.poll(200); for (ConsumerRecord record : consumerRecords) { try{ log.debug("---kafkaConsumer2接收到 :" + record.topic() + "------" + record.key() + "---------" + record.value()); JSONObject jsonObject = JSONObject.parseObject(record.value().toString()); // 變更的數據 JSONArray data = jsonObject.getJSONArray("data"); int size = null == data ? 0 : data.size(); // 數據庫信息 String database = jsonObject.getString("database"); Boolean isDdl = jsonObject.getBoolean("isDdl"); JSONArray old = jsonObject.getJSONArray("old"); // 主鍵信息 JSONArray pkNames = jsonObject.getJSONArray("pkNames"); // sql語句 String sql = jsonObject.getString("sql"); JSONObject sqlType = jsonObject.getJSONObject("sqlType"); // 表名 String table = jsonObject.getString("table"); // 操作類型 String type = jsonObject.getString("type"); log.debug("---當前操作類型:{}", type); log.debug("---當前表名稱:{}", table); if(Objects.nonNull(database) && Objects.nonNull(table)){ if(!config.isCollectedTable(database,table)){ log.debug("-- 當前數據操作發生在數據表{}, 該表不屬於采集范疇:[{}]",database+"."+table,config.getMysqlDbTables()); continue; } }else{ log.warn("未獲取到表名信息,應該不是針對表數據的操作"); if (isDdl) { log.error("----DDL SQL : {}", sql); } continue; } log.warn("---kafkaConsumer2接收需要采集的數據 :" + record.topic() + "------" + record.key() + "---------" + record.value()); /** * -- Database 數據庫級別 * drop |create database 都是QUERY類型,都屬於ddl * * -- Table 表級別操作 * create table 屬於CREATE類型,屬於ddl,table為表名 * rename table 屬於RENAME類型,屬於ddl,table為表名 * drop table 屬於ERASE操作類型 * * -- Field 表字段級別操作 * add column 屬於ALTER類型,屬於ddl * modify column name|type 屬於ALTER類型,屬於ddl * drop column 屬於ALTER類型,屬於ddl * * -- 表數據操作 * insert data 屬於INSERT類型,(批量)單個insert操作在kafka中都是單獨消息 * update data 屬於UPDATE類型,單個更新每個update操作都在Kafka中都是單獨消息 * 批量更新- 單個消息data和old都是多個數據-更新數數組數據 * delete data 屬於DELETE類型,單個刪除在kafka中都是一個單獨消息 * 批量刪除- 單個消息包含多個操作 */ String topic = config.getKafkaTopic(table); Long ts = jsonObject.getLong("ts"); if(null == ts){ ts = System.currentTimeMillis(); } if(null != data){ addFieldsAndSend(topic, data, type,ts); } }catch (Exception e){ log.error("---消費從canal轉發kafka的消息失敗{}/{}",e.getLocalizedMessage(),e); }finally { // 手動提交 consumer.commitAsync(); } } } } private void addFieldsAndSend(String topic, JSONArray data, String type,Long ts) { for (int i = 0; i < data.size(); i++) { JSONObject jsonObject = data.getJSONObject(i); // Long類型的數據比DateTime類型在后續Clickhouse中數據去重時辨識度更高 ts = ts +(i+1); jsonObject.put(VERSION,ts); switch (type){ case "INSERT": jsonObject.put(SIGN,ADD_UPDATE); break; case "UPDATE": jsonObject.put(SIGN,ADD_UPDATE); break; case "DELETE": jsonObject.put(SIGN,DELETE); break; } String message = jsonObject.toJSONString(); producer.send(new ProducerRecord<>(topic,message)); log.error("----發送Kafka消息 ,topic-{},message-{}",topic,message); } } }
- 擴展canal操作
- 將jar包放入canal的lib目錄下
- 配置文件修改
- conf/canal.properties新增配置屬性
################################################## ####### Custom ################# ################################################## # 需要采集的數據表--帶數據庫名 ,使用英文逗號分隔 custom.async.mysql.db_table.list=douc.sys_group,douc.sys_group_user,douc.sys_user,douc.sys_department # 注意: 處理后的每個張表的數據發送往 prefix_table_suffix topic且增刪改操作都處理為增,使用sign,version區分 custom.async.kafka.topic.prefix = mysql_ custom.async.kafka.topic.suffix = _topic
- conf/spring/file-instance.xml中注入擴展程序中的組件
<bean id="ReWrapperConfig" class="com.canal.config.ReWrapperConfig" init-method="init" destroy-method="destory"> <property name="sourceTopic" value="${canal.mq.topic}"/> <property name="kafkaServer" value="${kafka.bootstrap.servers}"/> <property name="mysqlDbTables" value="${custom.async.mysql.db_table.list}" /> <property name="topic_prefix" value="${custom.async.kafka.topic.prefix}" /> <property name="topic_suffix" value="${custom.async.kafka.topic.suffix}" /> </bean>
- conf/canal.properties新增配置屬性
- 不足|缺點
未處理好與canal內嵌kafka依賴的關系,導致打出來的jar包比較大
- pom.xml
- 擴展思路二
--直接針對canal內嵌的KafkaProducer進行處理,將數據處理和數據分發功能在源碼中處理
- 待續。。。
- 需求說明
- Canal源碼編譯安裝(待續。。。)