在kafka connect 同步 mysql 主從數據庫


下載以下文件,解壓,放置到kafka的libs目錄

kafka-connect-jdbc-4.1.1

從這里選擇適合的mysql connector

mysql-connector-java-8.0.16.jar

將里面的jar文件提取出來,也放到kafka的libs目錄

在config目錄下創建 connect-mysql-source.properties

創建 A數據庫源表person

CREATE TABLE `person` (
  `pid` int(11) NOT NULL AUTO_INCREMENT,
  `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

創建 B數據庫目標表kafkaperson

CREATE TABLE `kafkaperson` (
  `pid` int(11) NOT NULL AUTO_INCREMENT,
  `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` int(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

connect-mysql-source.properties 內容為

name=mysql-a-source-person
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://127.0.0.1:3306/a_db?user=root&password=root
# incrementing  自增
mode=incrementing
# 自增字段  pid
incrementing.column.name=pid
# 白名單表  person
table.whitelist=person
# topic前綴   mysql-kafka-
topic.prefix=mysql-kafka-

connect-mysql-sink.properties 內容

name=mysql-a-sink-person
connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
tasks.max=1
#kafka的topic名稱
topics=mysql-kafka-person
# 配置JDBC鏈接
connection.url=jdbc:mysql://127.0.0.1:3306/b_db?user=root&password=root
# 不自動創建表,如果為true,會自動創建表,表名為topic名稱
auto.create=false
# upsert model更新和插入
insert.mode=upsert
# 下面兩個參數配置了以pid為主鍵更新
pk.mode = record_value
pk.fields = pid
#表名為kafkatable
table.name.format=kafkaperson

啟動kafka

參考 kafka安裝

如果報 The server time zone value ” is  unrecognized or represents more than one time 。。。

以命令行進入mysql

mysql> show variables like '%time_zone%'; +------------------+--------+ | Variable_name | Value | +------------------+--------+ | system_time_zone | | | time_zone | SYSTEM | +------------------+--------+ 2 rows in set

如果輸出結果是system

設置time_zone即可

mysql> set global time_zone='+8:00'; Query OK, 0 rows affected;




稍微有點延遲、並且只有添加才會同步,更新、刪除都不行。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM