一、canal介紹
官網的介紹:
名稱:canal [kə'næl]
譯意: 水道/管道/溝渠
語言: 純java開發
定位: 基於數據庫增量日志解析,提供增量數據訂閱&消費,目前主要支持了mysql
關鍵詞: mysql binlog parser / real-time / queue&topic
canal是阿里巴巴使用純java語言開發的一款基於數據庫日志增量解析,以提供增量數據訂閱和消費的軟件.
其實說白了,canal就是一款實現增量數據同步的工具,當前只支持監控並解析binlog。
那canal可以做哪些事情呢:
- 數據庫鏡像
- 數據庫實時備份
- 數據庫多級索引的維護
- 業務緩存刷新
- 帶有業務邏輯數據的實時處理
canal的工作原理
Mysql主從同步原理
從圖中可以看到:
- master實例將數據庫的變更(delete,update,insert,...)日志順序寫入到binary log當中。
- 當slave連接到master的時候,master會為slave開啟一個binlog dump線程,當master的binlog發生變化的時候,binlog dump線程會通知slave,並將變化的binlog數據發送給slave。
- 當主從同步開啟的時候,在slave上會創建2個線程
- IO Thread
該線程連接到master機器,master上的binlog dump線程會將binlog內容發送給該線程,該IO線程接收到binlog內容之后,再將內容寫到本地的relay log中。 - SQL Thread
該線程讀取IO線程寫入的relay log。並根據relay log的內容對slave數據庫做相應的操作
- IO Thread
Canal 原理
canal的工作原理其實是模擬了數據庫的主從同步機制,將自己偽裝成mysql slave:
- 模擬mysql master與slave的通信協議,它向master發送dump請求
- master收到canal發送過來的請求之后,開始推送binlog給canal
- canal接受binlog進行解析binary log對象(原始為protobuf byte流)並sink到下游(如:mysql,kafka,es,...)
Canal 架構
說明:
- server代表的是一個canal運行實例,代表的是一個jvm
- instace對應的是一個數據隊列(一個server可以部署多個insance)
instance模塊:
- eventParser:數據源接入,模擬slave和master進行交互,協議解析
- eventSink:Parser和Store的連接器,進行數據過濾,加工,分發工作
- eventStore:數據存儲
- metaManager:增量訂閱&消費信息管理器
二、Canal 的搭建
Mysql的配置
當前canal支持的版本有 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x
-
對於自建的mysql,需要進行如下配置:
- 開啟MySQL的binlog全日制功能
- 配置binlog-format為ROW模式
對應的my.cnf中的配置如下:
[mysqld] log-bin=mysql-bin # 開啟 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
- 重啟mysql服務
- 創建
canal
用戶,並進行授權,使其具有mysql slave
的權限:
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; FLUSH PRIVILEGES;
-
如果使用的是RDS數據庫,則直接進行創建
canal
用戶並授權操作即可。
Canal的安裝
-
下載
點擊這里,下載所需要的版本的安裝包,我這里以1.1.4為例:
在下方找到deployer包:
-
解壓
執行
tar zxvf ./canal.deployer-1.1.4.tar.gz
之后可以看到解壓后的目錄結構如下: -
配置
進入到conf目錄下
cd conf
,可以看到有一個example的文件夾,這個是canal自帶的一個instance文件夾,我們需要拷貝一個並重名為我們自己的cp -r ./example ./route
,最終目錄結構像這樣:執行
vi route/instance.properties
編輯配置文件################################################# ## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=1234 # enable gtid use true/false canal.instance.gtidon=false # position info 這里需要改成自己的數據信息 canal.instance.master.address=192.168.2.226:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password 這里需要改成前面創建並授權了的數據庫信息 canal.instance.dbUsername=canal canal.instance.dbPassword=canal@winner # The encoding that represents the databases corresponds to the encoding type in Java,such as UTF-8,GBK , ISO-8859-1 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex 這里配置需要過濾的表名,正則語法,多個表(庫名.表名),使用逗號分隔開來,(.*\\..* 表示讀取所有的庫中的表),這里列舉過濾兩張表為例 canal.instance.filter.regex=db1\\.user,db2\\.device # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config 如果是准備將解析后的日志發送到kafka,這里用來配置每個表的數據發送到那個topic # 如果是打算所有的監控的標的日志數據都打到一個topic中的話,可以這樣設置一個topic名即可 canal.mq.topic=example # dynamic topic route by schema or table regex 這里是動態topic的配置 # 如果你打算將不同的表的日志打到不同的topic中里面去的話,可以打開下面的配置,格式為[topic:table],如果是多個,可使用逗號分隔, 當然上面的靜態topic和此動態topic是可以同時打開的 canal.mq.dynamicTopic=bi_binlog_config_topic:db1\\.user,bi_binlog_config_topic:db2\\.device canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3
執行
vi canal.properties
修改該配置文件################################################# ######### common argument ############# ################################################# # tcp bind ip 配置canal所在機器的ip canal.ip = 192.168.2.223 # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd # canal.user = canal # canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config #canal.admin.manager = 127.0.0.1:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # 配置canal的zk地址 canal.zkServers = 192.168.1.227:2181,192.168.1.226:2181,192.168.1.225:2181 # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, RocketMQ canal.serverMode = kafka # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config,# if your server has only one cpu,you need open this confi and set value to false canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = ################################################# ######### destinations ############# ################################################# canal.destinations = route # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml #canal.instance.global.spring.xml = classpath:spring/file-instance.xml canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ ############# ################################################## canal.mq.servers = 192.168.1.227:9092,192.168.1.226:9092,192.168.1.225:9092 canal.mq.retries = 0 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 canal.mq.lingerMs = 100 canal.mq.bufferMemory = 33554432 canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 # 該配置如果設置為false,則canal不進行日志解析,只發送原生的protpbuf二進制日志,消息體相對較小,如果為true,canal會將其解析為json格式,消息體相對較大,占用存儲空間較大 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all #canal.mq.properties. = # canal.mq.producerGroup = test # Set this value to "cloud", if you want open message trace feature in aliyun. # canal.mq.accessChannel = local # aliyun mq namespace #canal.mq.namespace = ################################################## ######### Kafka Kerberos Info ############# ################################################## canal.mq.kafka.kerberos.enable = false canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf" canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
配置文件中的配置項很多,總結一下,需要配置的項:
- instance.properties
## mysql serverId , v1.0.26+ will autoGen canal.instance.mysql.slaveId=1234 # position info canal.instance.master.address=192.168.1.218:3306 # username/password canal.instance.dbUsername=canal canal.instance.dbPassword=canal@winner canal.instance.connectionCharset = UTF-8 # table regex canal.instance.filter.regex=db1\\.user,db2\\.device canal.mq.topic=example canal.mq.dynamicTopic=bi_binlog_config_topic:db1\\.user,bi_binlog_config_topic:db2\\.device
- canal.properties
# tcp bind ip canal.ip = 192.168.1.173 # tcp, kafka, RocketMQ canal.serverMode = kafka canal.mq.flatMessage = true ################################################# ######### destinations ############# ################################################# canal.destinations =route #canal.instance.global.spring.xml = classpath:spring/file-instance.xml canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ ############# ################################################## canal.mq.servers = 192.168.1.227:9092,192.168.1.226:9092,192.168.1.225:19092 #canal.mq.producerGroup = test # Set this value to "cloud", if you want open message trace feature in aliyun. #canal.mq.accessChannel = local
- instance.properties
三、啟動驗證
啟動
進入到文件解壓目錄下,執行命令sh bin/startup.sh
啟動服務
查看日志
執行命令tail -f logs/canal/canal.log
,查看,到如下日志,說明服務啟動成功:
topic數據驗證
-
在kafka所部屬的機器上通過客戶端,進入kafka的安裝目錄,打開消費者:
bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.225:9092 --topic bi_binlog_config_topic --from-beginning
-
對所監控的數據表進行執行insert/update/delete操作,驗證topic中是否有數據過來
-
可以看到kafka對應的topic中已經可以正確收到操作解析后的消息了
四、可能遇到的問題
1. canal啟動后出現生產者發送消息失敗的錯誤
-
報錯信息
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for bi_binlog_device_topic-0: 30064 ms has passed since batch creation plus linger time 2021-04-01 10:10:50.481 [pool-4-thread-1] ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for bi_binlog_device_topic-0: 30028 ms has passed since batch creation plus linger time java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for bi_binlog_device_topic-0: 30028 ms has passed since batch creation plus linger time at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:215) ~[canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:179) ~[canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:117) ~[canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.4.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131] Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for bi_binlog_device_topic-0: 30028 ms has passed since batch creation plus linger time at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.valueOrError(FutureRecordMetadata.java:94) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:64) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) ~[kafka-clients-1.1.1.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:213) ~[canal.server-1.1.4.jar:na] ... 8 common frames omitted Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for bi_binlog_device_topic-0: 30028 ms has passed since batch creation plus linger time
-
原因排查
錯誤提示的是由於連接kafka集群超時引起的,但是按照網上說的
- 擴大超時限制
- 修改kafka的配置
advertised.listeners=PLAINTEXT://192.168.14.140:9092
在進行了上面的幾步操作核查之后,發現並沒有解決問題,開始找其他的出路。
報錯信息那種看到bi_binlog_device_topic-0
,想應該是canal在往這個topic中寫數據的時候出了問題,會不很可能是由於topic的問題導致了,所以由於是測試環境,打算把topic的數據清空,然后在重啟canal之后,果然問題解決可以看到已經正常讀取日志了
-
問題解決
清空topic數據解決,這里有兩種方式:
- 刪除topic,然后重新創建
# 刪除topic ./kafka-topics.sh --zookeeper localhost:2181 --delete --topic bi_binlog_config_topic # 創建topic ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic bi_binlog_config_topic
- 清空topic中的數據
# 清空數據 ./kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name bi_binlog_store_topic --alter --add-config retention.ms=10000 # 查看狀態 ./kafka-configs.sh --zookeeper localhost:2181 --describe --entity-type topics --entity-name bi_binlog_config_topic
- 刪除topic,然后重新創建
2. 連接kafka集群超時
-
報錯信息
2021-04-01 11:38:44.322 [pool-4-thread-2] ERROR com.alibaba.otter.canal.kafka.CanalKafkaProducer - java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. java.lang.RuntimeException: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:215) ~[canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:179) ~[canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.send(CanalKafkaProducer.java:117) ~[canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.worker(CanalMQStarter.java:183) [canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter.access$500(CanalMQStarter.java:23) [canal.server-1.1.4.jar:na] at com.alibaba.otter.canal.server.CanalMQStarter$CanalMQRunnable.run(CanalMQStarter.java:225) [canal.server-1.1.4.jar:na] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_131] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_131] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131] Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1150) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:846) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:784) ~[kafka-clients-1.1.1.jar:na] at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:671) ~[kafka-clients-1.1.1.jar:na] at com.alibaba.otter.canal.kafka.CanalKafkaProducer.produce(CanalKafkaProducer.java:199) ~[canal.server-1.1.4.jar:na] ... 8 common frames omitted
-
原因排查
出現該問題的原因是在instance.properties配置中配置了動態topic之后,把
canal.mq.topic=example
給注釋掉了導致的,所以需要把這個放開# mq config canal.mq.topic=example canal.mq.dynamicTopic=bi_binlog_store_topic:db0\\.patrol_report,bi_binlog_topic:db1\\.store1
-
問題解決
參照上一問題的解決方案
五、結束語
本文主要針對阿里開源同步工具canal做了簡單的介紹,並對具體的搭建步驟,並將數據寫入到kafka的過程做了簡要的總結,數據寫入到kafka之后,接下來的就是消費后續程序消費kafka的消息了,可以是flink、spark,...,這里做個筆記,希望能幫助到需要的人。
但是這樣是有一個問題的,就是關於canal的單點故障的問題,所以一般生產環境中,我們都需要對canal進行高可用搭建。