項目需求
一個數據庫的大表(百萬級別),每次group后進行sum操作,比較耗時,借助kafka實現實時統計分析
搭建測試環境
為了測試方法,基於本地的docker進行部署(yml文件下載),里面包括了:Zookpper,Kafka服務,schema registry, kafka connect, ksqldb等
-
docker安裝成功后,基於kafka客戶端工具進行測試,這里需要連接部署kafka服務的端口
-
基於docker命令查看部署服務的network名稱,如果之前以及基於docker安裝了sqlserver,則需要將其修改為同一個network
安裝Linux版本的sqlserver數據庫(也可以編輯上面的yml文件,將sqlserver和kafka服務一起安裝,這樣它們就在同一個network了)
- 基於Docker進行安裝
docker pull mcr.microsoft.com/mssql/server:2017-latest
- 啟動sqlserver,需要指定network
docker run -e "ACCEPT_EULA=Y" -e "SA_PASSWORD=xxxx" -p 11433:1433 --name sqlserver --network kafkawithconnect_default -d mcr.microsoft.com/mssql/server:2017-latest
- 如果需要修改sqlserver的network,則可以通過下面命令進行操作
- 先連接新的網橋:
docker network connect new_network 容器ID
(new_network是新名稱) - 再斷開舊的網橋:
docker network disconnect old_network 容器ID
(old_network 是舊名稱)
- 先連接新的網橋:
- 開啟數據庫的capture功能
-- Enable Database for CDC template -- ==== USE MyDB GO EXEC sys.sp_cdc_enable_db GO
- 開啟table的監聽功能
USE MyDB GO EXEC sys.sp_cdc_enable_table @source_schema = N'dbo', @source_name = N'MyTable', @role_name = NULL, @supports_net_changes = 1 GO
- 啟動sqlserver的代理服務
- 進入sqlserver容器 :
docker exec -it --user root sqlserver bash
- 執行命令:
/opt/mssql/bin/mssql-conf set sqlagent.enabled true
- 順便記錄下其內網ip,后面會用,比如是 172.18.0.4
- 重啟sql容器
- 進入sqlserver容器 :
- 查看數據庫的代理狀態
kafka服務關聯sqlserver
基於kafka-connect進行配置
- 通過docker exec進入容器kafka-connect
- 基於curl來配置和數據庫的連接
curl -k -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d ' { "name": "nonpharm_copy_connect2", "config": { "connector.class": "io.debezium.connector.sqlserver.SqlServerConnector", "database.hostname": "172.18.0.4", //sqlserver 內網ip "database.port": "1433", "database.user": "sa", "database.password": "xxx", "database.dbname": "DemoData", //database name "database.server.name": "cf037dc10467", //數據庫服務名稱,如果是基於docker部署,也是容器的名稱 "table.whitelist": "dbo.nonpharm_copy2", //table name "database.history.kafka.bootstrap.servers": "broker:29092", "database.history.kafka.topic": "sshist_nonpharm_copy2", "transforms.unwrap.delete.handling.mode":"rewrite", "transforms":"unwrap,route", "transforms.route.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex":"([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.unwrap.drop.tombstones":"false", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState", "transforms.route.replacement":"sqlserv_$3" } }'
- 運行后查看狀態
curl -k http://localhost:8083/connectors/nonpharm_copy_connect2/status
,這一步很關鍵,要確保是running狀態並且沒有任何錯誤,然后通過kafka客戶端能看到對應的topic,這個算是原始的數據流,修改數據庫的table,這里就會有新值
- 刪除connect的命令
curl -X DELETE http://localhost:8083/connectors/<connector-name>
基於KSql進行數據操作
- 進入到KSql服務
docker exec --interactive --tty ksqldb ksql http://localhost:8088
- 如果需要從數據的開始進行設置,則需要設置
SET 'auto.offset.reset' = 'earliest';
- 通過命令查看當前的topic/stream/table
Show topics; show streams; show tables;
- 創建Stream :
create stream Pharm_source_s_all with (kafka_topic='sqlserv_PharmXInvoiceItem_V2',value_format='avro');
- 也可以創建Table,要看具體需求,簡單來說,stream相當於記錄數據庫每行的每個動作(無邊界且不可修改,消費的時候數據庫的變化會實時顯示),而table是記錄最新值(有邊界,運行一次后,數據庫的實時更新不會同步,相當於展示的是stream當時的快照)。
- 基於原始的topic創建table
create table NonPharm_copy_source (Id bigint PRIMARY key,MychemID int,ExtendedCostExGst double,QuantitySupplied double) with (kafka_topic='sqlserv_nonpharm_copy2',value_format='avro');
- 基於原始的topic創建table
- 如果數據都是追加而沒有更新的,並且需要基於客戶端實時顯示,則應該創建Stream
- 和數據庫進行關聯的topic,要基於avro格式來創建。
- 也可以創建Table,要看具體需求,簡單來說,stream相當於記錄數據庫每行的每個動作(無邊界且不可修改,消費的時候數據庫的變化會實時顯示),而table是記錄最新值(有邊界,運行一次后,數據庫的實時更新不會同步,相當於展示的是stream當時的快照)。
- 創建成功后,可以通過describe tablename/streamname來查看數據類型;也可以通過select查詢來測試這個最新創建的table
- 基於業務需要,設置需要實時匯總的數據查詢
create table NonPharm_cpoy_live as select MychemID,SUM(ExtendedCostExGst) as totalMoney ,SUM(QuantitySupplied) as totalCount,SUM(ExtendedCostExGst)/SUM(QuantitySupplied) as unitprice from NONPHARM_copy_SOURCE group by MychemID emit changes;
創建成功后,會顯示NonPharm_cpoy_live這個topic
- 通過
print 'NONPHARM_CPOY_LIVE' from BEGINNING;
查詢實時匯總的數據
到這里,基於kafka監聽sqlserver數據實時變化並且輸出匯總數據的服務器端配置基本就結束了,還有幾點需要注意的:
- 基於yml創建docker的時候,如果不是在本機操作,則一定需要將這個IP改完部署服務的ip,並且在kafka客戶端連接的配置也需要用這個IP
- 測試的時候,如果sqlserver數據類型是decimal,導入到ksql后進行相除匯總,如果除不盡,則返回的是null,所以后面是先在sqlserver將decimal轉為float后再導入到KSQL,如果有高手能指點這塊,非常感謝!
- 創建新的table最好使用新名字(即使之前的table已經被刪除了)
- 如果需要客戶端每次從頭開始消費,則需要設置不同的GroupId
- Table VS Stream
- 理論上,我們可以不通過KSQL進行匯總,而是再各自的客戶端進行匯總,但是通過KSQL匯總的好處是匯總一次,其它客戶端或者數據看板可以直接使用,比較方便
- 如果是基於JSON的數據源,則消費的時候沒什么特別的;但如果是基於AVRO的格式,則需要再消費的時候配置反序列化的方式,以NET Core為例:
var consumer = new ConsumerBuilder<int, GenericRecord>(config).SetValueDeserializer(new AvroDeserializer<GenericRecord>(schemaRegistry).AsSyncOverAsync()).Build()
Demo下載