前言
本篇只介紹跟 Kafka模式 相關的配置。
- TCP模式 請參考文章:【Canal——增量同步MySQL數據到ElasticSearch】
- 高可用 請參考文章:【Canal——高可用架構設計與應用】
一、架構
二、canal-server 配置
-
修改canal 配置文件:
vi /usr/local/canal/conf/canal.properties
... # 可選項: tcp(默認), kafka, RocketMQ canal.serverMode = kafka ...
# kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 127.0.0.1:6667 canal.mq.retries = 0 # flagMessage模式下可以調大該值, 但不要超過MQ消息體大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下請將該值改大, 建議50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默認50K, 由於kafka最大消息體限制請勿超過1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get數據的超時時間, 單位: 毫秒, 空為不限超時 canal.mq.canalGetTimeout = 100 # 是否為flat json格式對象 canal.mq.flatMessage = false canal.mq.compressionType = none canal.mq.acks = all # kafka消息投遞是否使用事務 canal.mq.transaction = false
-
修改instance 配置文件:
vi conf/example/instance.properties
# 按需修改成自己的數據庫信息 ################################################# ... canal.instance.master.address=192.168.1.20:3306 # username/password,數據庫的用戶名和密碼 ... canal.instance.dbUsername = canal canal.instance.dbPassword = canal ... # mq config canal.mq.topic=rds_test # 針對庫名或者表名發送動態topic #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #庫名.表名: 唯一主鍵,多個表之間用逗號分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################
三、canal-adapter 配置
- 修改adapter 配置文件:vi conf/application.yml
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: kafka # tcp kafka rocketMQ # canalServerHost: 127.0.0.1:11111 # zookeeperHosts: slave1:2181 mqServers: 127.0.0.1:9092 #or rocketmq # flatMessage: true batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: canalAdapters: - instance: rds_test # canal instance Name or mq topic name groups: - groupId: g1 outerAdapters: - name: rdb key: oracle1 properties: jdbc.driverClassName: oracle.jdbc.OracleDriver jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl jdbc.username: system jdbc.password: manager - groupId: g2 outerAdapters: - name: rdb key: oracle2 properties: jdbc.driverClassName: oracle.jdbc.OracleDriver jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl jdbc.username: system jdbc.password: manager
說明: 一份數據可以被多個 group 同時消費, 多個 group 之間會是一個並行執行, 一個 group 內部是一個串行執行多個 outerAdapters。
參考:
https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart