debezium、kafka connector 解析 mysql binlog 到 kafak


目的: 需要搭建一個可以自動監聽MySQL數據庫的變化,將變化的數據捕獲處理,此處只講解如何自動捕獲mysql 中數據的變化

使用的技術

debeziumhttps://debezium.io/documentation/reference/1.0/connectors/mysql.html

kafkahttp://kafka.apache.org/

zookeeperhttp://zookeeper.apache.org/

mysql 5.7  https://www.mysql.com/

一、思路

需要一台 Centos 7.x 的虛擬機 ,zk、debezium、kafka、confluent 運行在 虛擬機上 ,mysql 運行在 windows 系統上,虛擬機監聽 window 環境下的 mysql 數據變化

二、MySQL 環境准備

首先需要找到 mysql 的配置文件:my.ini ,我的路徑是:C:\ProgramData\MySQL\MySQL Server 5.7 ,因為監聽基礎是基於 mysql binlog ,需要開啟binlog ,添加如下配置

log_bin =D:\mysql-binlog\mysql-bin

binlog_format=Row

server-id=223344

binlog_row_image  = full

expire_logs_days  = 10

binlog_rows_query_log_events = on

重啟 mysql 服務

net stop  mysql57
net start  mysql57

此處,MySQL binlog 即開啟,可以簡單的驗證,cmd 窗口 mysql -u root -p 登錄 mysql 

show binary logs;

 可以看到文件內容,即mysql 變化的二進制文件。到此處,MySQL准備就緒。

二、zookeeper 、 kafka  准備

下載 zookeeper-3.4.14.tar.gz 、kafka_2.12-2.2.0.tar

mkdir -p  /usr/local/software/zookeeper
mkdir -p  /usr/local/software/kafka
mkdir -p  /usr/local/software/confluent

准備好路徑,並將安裝包移入該目錄,並解壓

mv  zookeeper-3.4.14.tar.gz   /usr/local/software/zookeeper
mv kafka_2.12-2.2.0.tar

進入 zookeeper   /usr/local/software/zookeeper/zookeeper-3.4.14/conf目錄,修改 zoo.cfg (原名 zoo_sample.cfg)內容

dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs

進入 dataDir 目錄,創建文件 myid ,並添加內容:  1

此處,zk 的配置修改結束。開啟配置 kafka  路徑是:/usr/local/software/kafka/kafka_2.12-2.2.0/config, 修改 server.properties 

broker.id=1
listeners=PLAINTEXT://192.168.91.25:9092
advertised.listeners=PLAINTEXT://192.168.91.25:9092
log.dirs=/opt/data/kafka-logs
host.name=192.168.91.25
zookeeper.connect=localhost:2181

三、debezium配置

此處需要 debezium connector 對 mysql 的 jar 包,下載地址:https://debezium.io/releases/1.0/

 

 將下載好的 plugs 上傳到虛擬機,解壓后名稱是: debezium-connector-mysql

移動到: /usr/local/share/kafka/plugins 目錄下,如果沒有該目錄則手動創建

 依賴的 jar 包下載好后,配置 kafka 目錄中conf connector

目錄: /usr/local/software/kafka/kafka_2.12-2.2.0/conf/connect-standalone.properties

bootstrap.servers=本機IP:9092
plugin.path=/usr/local/share/kafka/plugins

 此外,在kafka 根目錄下 創建文件: msyql.properties ,內容

name=mysql
connector.class=io.debezium.connector.mysql.MySqlConnector
database.hostname=192.168.3.125
database.port=3306
database.user=root
database.password=123456
database.server.id=112233
database.server.name=test  
database.whitelist=orders,users  
database.history.kafka.bootstrap.servers=192.168.91.25:9092
database.history.kafka.topic=history.test
include.schema.changes=true
include.query=true
# options: adaptive_time_microseconds(default)adaptive(deprecated) connect()
time.precision.mode=connect
# options: precise(default) double string
decimal.handling.mode=string
# options: long(default) precise
bigint.unsigned.handling.mode=long

四、啟動服務

01.啟動zk

cd /usr/local/software/zookeeper/zookeeper-3.4.14 
zkServer.sh  start

02.啟動kafka

cd  /usr/local/software/kafka/kafka_2.12-2.2.0
./bin/kafka-server-start.sh  -daemon  config/server.properties 

03.啟動kafka  connector

cd  /usr/local/software/kafka/kafka_2.12-2.2.0
 ./bin/connect-standalone.sh  config/connect-standalone.properties  mysql.properties 

04.查看 topic ,在新的端口查看

 ./bin/kafka-topics.sh --list --zookeeper localhost:2181

五、指定監聽的數據庫/表

在 postman 中模擬 post 請求,以 json 格式傳遞參數:表示 監聽 shiro數據庫

{
  "name": "shiro",  
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector", 
    "database.hostname": "192.168.3.125", 
    "database.port": "3306", 
    "database.user": "root", 
    "database.password": "123456", 
    "database.server.id": "184054", 
    "database.server.name": "my", 
    "database.whitelist": "shiro", 
    "database.history.kafka.bootstrap.servers": "192.168.91.25:9092", 
    "database.history.kafka.topic": "history.shiro", 
    "include.schema.changes": "true" 
  }}

重新查看topic 

在新端口啟動 kafka 消費者,消費my.shiro.user 

./bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092 --topic my.shiro.user --from-beginning

Java客戶端消費者代碼

package kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.util.Arrays;
import java.util.Properties;

/**
 * Created by baizhuang on 2019/10/25 10:39.
 */

public class MyConsumer {
    public static void main(String []args){

        //1.創建 kafka 生產者配置信息。
        Properties properties = new Properties();

        //2.指定 kafka 集群
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.25:9092");

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);

        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");

        //key,value 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringDeserializer");

        properties.put("group.id","test");


        KafkaConsumer<String,String> consumer = new  KafkaConsumer<String,String>(properties);


        consumer.subscribe(Arrays.asList("my.shiro.user"));


        while (true) {
            ConsumerRecords<String, String> consumerRecords = consumer.poll(100);

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.key() + "-----" + consumerRecord.value());
            }

        }

    }
}
View Code

Java 客戶端生產者代碼

package kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Created by baizhuang on 2019/10/24 16:58.
 */

public class MyProducer {
    public static void main(String []args){


        //1.創建 kafka 生產者配置信息。
        Properties properties = new Properties();

        //2.指定 kafka 集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.25:9092");

        //3.
        properties.put("acks","all");

        //4.重試次數
        properties.put("retries",0);

        //5.批次大小
        properties.put("batch.size",16384);

        //6.等待時間
        properties.put("linger.ms",1);

        //7.RecordAccumlate 緩沖區大小
        properties.put("buffer.memory",33554432);

        //key ,value 序列化
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");


        //9.創建生產者
        KafkaProducer<String,String>  producer = new KafkaProducer<String, String>(properties);

        for(int i=0;i<10;i++){
            //10.發送
            String key = String.valueOf(i);
            String value = ""+key+"條消息";
            producer.send(new ProducerRecord<String, String>("mytopic",key,value));
            System.out.println("msg:"+i);
        }

        producer.close();

    }
}
View Code

啟動消費者,修改 shiro 數據庫的user 表,Java客戶端消費者與 linux 消費者均可動態的顯示變化的數據

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM