debezium 數據變更工具使用


1.  作用
簡單概述就是CDC(change data capture),實時數據分析領域用的比較多
 
2. 簡單使用(基於官網的docker 說明)
 備注: 測試沒有使用守護進程模式為了方便測試
a. zookeeper
docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:0.6
 
b. kafka
docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:0.6
 
c. mysql 
docker run -it --rm --name mysql -p 3306:3306 -e MYSQL_ROOT_PASSWORD=debezium -e MYSQL_USER=mysqluser -e MYSQL_PASSWORD=mysqlpw debezium/example-mysql:0.6
 
d. mysql-client
docker run -it --rm --name mysqlterm --link mysql --rm mysql:5.7 sh -c 'exec mysql -h"$MYSQL_PORT_3306_TCP_ADDR" -P"$MYSQL_PORT_3306_TCP_PORT" -uroot -p"$MYSQL_ENV_MYSQL_ROOT_PASSWORD"'
 
查看表信息
use inventory;
show tables;
SELECT * FROM customers;
 
e. kafka connect
docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets --link zookeeper:zookeeper --link kafka:kafka --link mysql:mysql debezium/connect:0.6
 
 進行connect 信息查看
curl -H "Accept:application/json" localhost:8083/
curl -H "Accept:application/json" localhost:8083/connectors/
 
f. 監控MySQL 數據庫變更
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "inventory-connector", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "mysql", "database.port": "3306", "database.user": "debezium", "database.password": "dbz", "database.server.id": "184054", "database.server.name": "dbserver1", "database.whitelist": "inventory", "database.history.kafka.bootstrap.servers": "kafka:9092", "database.history.kafka.topic": "dbhistory.inventory" } }'

返回信息如下:
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.whitelist": "inventory",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory"
  }
}
查看注冊的connect
 curl -H "Accept:application/json" localhost:8083/connectors/
 ["inventory-connector"]
g. 查看數據變更
docker run -it --name watcher --rm --link zookeeper:zookeeper debezium/kafka:0.6 watch-topic -a -k dbserver1.inventory.customers

應該會看到包含下面的信息:
{
  "schema": {
    "type": "struct",
    "name": "dbserver1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

進行數據變更
UPDATE customers SET first_name='Anne Marie' WHERE id=1004;
應該會看到下面的信息
{
  "schema": {
    "type": "struct",
    "name": "dbserver1.inventory.customers.Key"
    "optional": false,
    "fields": [
      {
        "field": "id",
        "type": "int32",
        "optional": false
      }
    ]
  },
  "payload": {
    "id": 1004
  }
}

刪除、添加的信息類似,同時對於監控數據變更的服務,在停止之后,重新啟動數據還是可以同步過來的
 
 
3. 說明
大數據,微服務應用開發、單體應用向微服務遷移的時候使用起來可以減少好多開發的工作量
4. 參考資料
http://debezium.io/docs/tutorial/
遺留系統重建實踐
Migrating_to_Microservices_Databases_Red_Hat.pdf (https://developers.redhat.com/promotions/migrating-to-microservice-databases/)
 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM