Canal——canal server 讀取 binlog 到 kafka 然后在使用 canal-adapter


 

前言

本篇只介紹跟 Kafka模式 相關的配置。

 

一、架構

 

二、canal-server 配置

  • 修改canal 配置文件: vi /usr/local/canal/conf/canal.properties

...
# 可選項: tcp(默認), kafka, RocketMQ
canal.serverMode = kafka
...
# kafka
/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092 canal.mq.servers = 127.0.0.1:6667 canal.mq.retries = 0 # flagMessage模式下可以調大該值, 但不要超過MQ消息體大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下請將該值改大, 建議50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默認50K, 由於kafka最大消息體限制請勿超過1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get數據的超時時間, 單位: 毫秒, 空為不限超時 canal.mq.canalGetTimeout = 100 # 是否為flat json格式對象 canal.mq.flatMessage = false canal.mq.compressionType = none canal.mq.acks = all # kafka消息投遞是否使用事務 canal.mq.transaction = false
  • 修改instance 配置文件: vi conf/example/instance.properties

#  按需修改成自己的數據庫信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,數據庫的用戶名和密碼
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...
# mq config
canal.mq.topic=rds_test
# 針對庫名或者表名發送動態topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#庫名.表名: 唯一主鍵,多個表之間用逗號分隔
#canal.mq.partitionHash=mytest.person:id,mytest.role:id
#################################################

 

三、canal-adapter 配置

  • 修改adapter 配置文件:vi conf/application.yml
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: kafka # tcp kafka rocketMQ
#  canalServerHost: 127.0.0.1:11111
#  zookeeperHosts: slave1:2181 mqServers: 127.0.0.1:9092 #or rocketmq
#  flatMessage: true
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:

  canalAdapters:
  - instance: rds_test # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
       - name: rdb
         key: oracle1
         properties:
           jdbc.driverClassName: oracle.jdbc.OracleDriver
           jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl
           jdbc.username: system
           jdbc.password: manager
    - groupId: g2
      outerAdapters:
       - name: rdb
         key: oracle2
         properties:
           jdbc.driverClassName: oracle.jdbc.OracleDriver
           jdbc.url: jdbc:oracle:thin:@192.168.0.131:1521/orcl
           jdbc.username: system
           jdbc.password: manager

說明: 一份數據可以被多個 group 同時消費, 多個 group 之間會是一個並行執行, 一個 group 內部是一個串行執行多個 outerAdapters。

 

 

參考:

https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM