canal實時同步mysql表數據到Kafka


准備

  • 對於自建 MySQL , 需要先開啟 Binlog 寫入功能,配置 binlog-format 為 ROW 模式,my.cnf 中配置如下

    [mysqld]
    log-bin=mysql-bin # 開啟 binlog
    binlog-format=ROW # 選擇 ROW 模式
    server_id=1 # 配置 MySQL replaction 需要定義,不要和 canal 的 slaveId 重復
    • 注意:針對阿里雲 RDS for MySQL , 默認打開了 binlog , 並且賬號默認具有 binlog dump 權限 , 不需要任何權限或者 binlog 設置,可以直接跳過這一步
  • 授權 canal 鏈接 MySQL 賬號具有作為 MySQL slave 的權限, 如果已有賬戶可直接 grant

    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

      

安裝canal.server

1 下載壓縮包

到官網地址(release)下載最新壓縮包,請下載 canal.deployer-latest.tar.gz

2 將canal.deployer 復制到固定目錄並解壓

mkdir -p /usr/local/canal
cp   canal.deployer-1.1.1.tar.gz   /usr/local/canal
tar -zxvf canal.deployer-1.1.1.tar.gz 

3 配置修改參數

a. 修改instance 配置文件,修改單個數據源的配置 vi conf/example/instance.properties

#  按需修改成自己的數據庫信息
#################################################
...
#需要同步的mysql地址,端口 canal.instance.master.address=192.168.1.20:3306 # username/password,數據庫的用戶名和密碼 ... canal.instance.dbUsername = root canal.instance.dbPassword = 123456
#需要同步的具體表,也可以使用正則表達式監聽多張表或者庫

canal.instance.filter.regex=realtime.tpp_vehicle_through_points_history ... # mq config
# topic的名稱 canal.mq.topic=topic_start # 針對庫名或者表名發送動態topic #canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 # 庫名.表名: 唯一主鍵,多個表之間用逗號分隔 #canal.mq.partitionHash=mytest.person:id,mytest.role:id #################################################

b. 修改canal 配置文件,修改 Canal 全局設置  vi ./conf/canal.properties

# ...
# 可選項: tcp(默認), kafka, RocketMQ
# 更改模式,直接把數據扔進 Kafka canal.serverMode = kafka # ... # kafka/rocketmq 集群配置: 192.168.1.117:9092,192.168.1.118:9092,192.168.1.119:9092
## Kafka 的地址 canal.mq.servers = 10.255.xx.xx:9092,10.255.xx.xx:9092,10.255.xx.xx:9092 canal.mq.retries = 0 # flagMessage模式下可以調大該值, 但不要超過MQ消息體大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下請將該值改大, 建議50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默認50K, 由於kafka最大消息體限制請勿超過1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get數據的超時時間, 單位: 毫秒, 空為不限超時 canal.mq.canalGetTimeout = 100 # 是否為flat json格式對象 #canal.mq.flatMessage = false
canal.mq.flatMessage = true # 使用文本格式(JSON)進行傳輸,否則 Kafka 里扔進去的是二進制數據,雖然不影響,但是看起來不方便 canal.mq.compressionType = none canal.mq.acks = all # kafka消息投遞是否使用事務 canal.mq.transaction = false

  詳細參數配置請看:https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

  • 開啟 Canal bin/startup.sh 
  • 查看日志是否有異常 vim logs/canal/canal.log vim logs/test/test.log
  • 測試,連上數據庫嘗試執行更改 SQL
  • 啟動kafka的消費監控:kafka-console-consumer --bootstrap-server app02:9092 --from-beginning --topic topic_start

     

     看得到同步的表名稱和同步的數據類型:UPDATE,INSERT,DELETE

  • 如果出現一些意外導致數據錯誤,數據不能采集,刪除/opt/zp/canal/conf/example/h2.mv.db和meta.dat。重啟canal


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM