下載以下文件,解壓,放置到kafka的libs目錄
從這里選擇適合的mysql connector
mysql-connector-java-8.0.16.jar
將里面的jar文件提取出來,也放到kafka的libs目錄
在config目錄下創建 connect-mysql-source.properties
創建 A數據庫源表person
CREATE TABLE `person` ( `pid` int(11) NOT NULL AUTO_INCREMENT, `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`pid`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1;
創建 B數據庫目標表kafkaperson
CREATE TABLE `kafkaperson` ( `pid` int(11) NOT NULL AUTO_INCREMENT, `firstname` varchar(255) CHARACTER SET utf8 DEFAULT NULL, `age` int(11) DEFAULT NULL, PRIMARY KEY (`pid`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
connect-mysql-source.properties 內容為
name=mysql-a-source-person connector.class=io.confluent.connect.jdbc.JdbcSourceConnector tasks.max=1 connection.url=jdbc:mysql://127.0.0.1:3306/a_db?user=root&password=root # incrementing 自增 mode=incrementing # 自增字段 pid incrementing.column.name=pid # 白名單表 person table.whitelist=person # topic前綴 mysql-kafka- topic.prefix=mysql-kafka-
connect-mysql-sink.properties 內容
name=mysql-a-sink-person connector.class=io.confluent.connect.jdbc.JdbcSinkConnector tasks.max=1 #kafka的topic名稱 topics=mysql-kafka-person # 配置JDBC鏈接 connection.url=jdbc:mysql://127.0.0.1:3306/b_db?user=root&password=root # 不自動創建表,如果為true,會自動創建表,表名為topic名稱 auto.create=false # upsert model更新和插入 insert.mode=upsert # 下面兩個參數配置了以pid為主鍵更新 pk.mode = record_value pk.fields = pid #表名為kafkatable table.name.format=kafkaperson
啟動kafka
參考 kafka安裝
如果報 The server time zone value ” is unrecognized or represents more than one time 。。。
以命令行進入mysql
mysql> show variables like '%time_zone%'; +------------------+--------+ | Variable_name | Value | +------------------+--------+ | system_time_zone | | | time_zone | SYSTEM | +------------------+--------+ 2 rows in set
如果輸出結果是system
設置time_zone
即可
mysql> set global time_zone='+8:00'; Query OK, 0 rows affected;


稍微有點延遲、並且只有添加才會同步,更新、刪除都不行。