debezium sql server 集成


debezium 是一個方便的cdc connector 可以幫助我們解決好多數據實時變更處理、數據分析、微服務的數據通信
從上次跑簡單demo到現在,這個工具是有好多的變更,添加了好多方便的功能,支持了越來越多的數據庫。
demo 使用了官方提供的docker-compose 文件

環境准備

  • docker-compose 文件
version: '2'
services:
  zookeeper:
    image: debezium/zookeeper:0.9
    ports:
     - 2181:2181
     - 2888:2888
     - 3888:3888
  kafka:
    image: debezium/kafka:0.9
    ports:
     - 9092:9092
    links:
     - zookeeper
    environment:
     - ZOOKEEPER_CONNECT=zookeeper:2181
  sqlserver:
    image: microsoft/mssql-server-linux:2017-CU8
    ports:
     - 1433:1433
    environment:
     - ACCEPT_EULA=Y
     - MSSQL_PID=Standard
     - SA_PASSWORD=Password!
  connect:
    image: debezium/connect:0.9
    ports:
     - 8083:8083
    links:
     - kafka
     - sqlserver
    environment:
     - BOOTSTRAP_SERVERS=kafka:9092
     - GROUP_ID=1
     - CONFIG_STORAGE_TOPIC=my_connect_configs
     - OFFSET_STORAGE_TOPIC=my_connect_offsets
 
 
  • 啟動
docker-compose up -d
 
  • 加載sql server 數據
    可以使用ui (azure data studio )工具或者命令行(inside docker),demo 提供了一個sql 腳本文件
cat debezium-sqlserver-init/inventory.sql | docker exec -i tutorial_sqlserver_1 bash -c '/opt/mssql-tools/bin/sqlcmd -U sa -P $SA_PASSWORD'
 

效果

  • 啟動connector
 
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
 

register-sqlserver.json 內容

{
    "name": "inventory-connector",
    "config": {
        "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
        "tasks.max" : "1",
        "database.server.name" : "server1",
        "database.hostname" : "sqlserver",
        "database.port" : "1433",
        "database.user" : "sa",
        "database.password" : "Password!",
        "database.dbname" : "testDB",
        "database.history.kafka.bootstrap.servers" : "kafka:9092",
        "database.history.kafka.topic": "schema-changes.inventory"
    }
}
 
 

效果

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-sqlserver.json
HTTP/1.1 201 Created
Date: Tue, 25 Dec 2018 11:26:05 GMT
Location: http://localhost:8083/connectors/inventory-connector
Content-Type: application/json
Content-Length: 462
Server: Jetty(9.4.12.v20180830)
{"name":"inventory-connector","config":{"connector.class":"io.debezium.connector.sqlserver.SqlServerConnector","tasks.max":"1","database.server.name":"server1","database.hostname":"sqlserver","database.port":"1433","database.user":"sa","database.password":"Password!","database.dbname":"testDB","database.history.kafka.bootstrap.servers":"kafka:9092","database.history.kafka.topic":"schema-changes.inventory","name":"inventory-connector"},"tasks":[],"type":null}%
 
 

測試

  • 打開消費者(kafka 命令行)
docker-compose -f docker-compose-sqlserver.yaml exec kafka /kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server kafka:9092 \
    --from-beginning \
    --property print.key=true \
    --topic server1.dbo.customers
 
 
  • 修改數據&&查看效果

說明

注意debezium 本地,0.9 支持sql server

參考資料

https://github.com/debezium/debezium-examples/tree/master/tutorial#using-sql-server
https://debezium.io/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM