背景
在各類物聯網項目中,設備產生的消息不僅僅作用於設備之間,還需要供業務系統使用以實現如安全審計、流量計費、數據統計、通知觸發等功能,類似很容易通過以下原型系統完成:
該原型中需要在 EMQ X 上維護多個數據通道,以供每個業務環節按照各自需求從 EMQ X 中獲取消息數據。這種解決方案的問題在於:
- 每個業務需要與 EMQ X 建立數據通道,數據通道的建立與保持需要額外的資源開銷,數據同步速度嚴重影響 EMQ X 高速消息交換;
- 隨着業務增長,每次新增業務環節都需要牽動整個系統變更;
- 由於每個環節處理速度與時序不一樣,消息量較大時部分業務會出現阻塞情況,進一步產生數據丟失、系統穩定性降低等嚴重后果。
以上問題與當下互聯網應用中遇到的問題高度一致,即多個業務系統之間的數據集成與數據同步問題。互聯網應用中普遍集成消息隊列以進行削峰、限流、隊列處理等操作,實現數據與業務的解耦,借助 EMQ X 提供的 RabbitMQ、Kafka、RocketMQ、Pulsar 等消息與流中間件橋接功能,物聯網項目也可以使用該模型來解決以上問題。
本文以常見物聯網使用場景為例,介紹了如何利用 EMQ X 消息中間件與開源流處理平台 Kafka 處理物聯網海量消息數據,以高可靠、高容錯的方式存儲海量數據流並保證數據流的順序進行消息數據存儲,同時有效地將消息數據提供給多個業務環節使用。
業務場景
假設現在有一個智能門鎖項目,所有門鎖每間隔 1 分鍾或任何時間開/關鎖等門鎖狀態變更時上報一次門鎖信息,上報 MQTT 主題如下(QoS = 1):
devices/{client_id}/state
每個設備發送的數據格式為 JSON,包括門鎖電量、開鎖狀態、操作結果等數據,內容如下:
{
"process_id": "7802441525528958",
"action": "unlock",
"battery": 83.4,
"lock_state": 1,
"version": 1.1,
"client_id": "10083618796833171"
}
每個門鎖均訂閱一個唯一的主題,作為遠程下發開鎖指令,下發 MQTT 主題如下(QoS = 1):
devices/{client_id}/command
下發的數據包括開鎖指令、消息加密驗證信息等:
{
"process_id": "7802441525528958",
"action": "unlock",
"nonce_str": "u7u4p0n8",
"ts": 1574744434,
"sign": "e9f5af7deaa28563"
}
上行、下行消息數據需要供以下三個業務環節使用:
- 消息通知:將開鎖狀態通知到門鎖用戶綁定的通知方式(手機短信、郵件);
- 狀態監控:分析處理門鎖定時上報的狀態信息,如果電量、狀態異常等需觸發告警通知用戶;
- 安全審計:分析上下行消息數據,記錄用戶開鎖行為,同時防范下行指令被篡改、重放等方式攻擊。
該方案中,EMQ X 會將以上主題的消息統一橋接到 Kafka 供業務系統使用,實現業務系統與 EMQ X 解耦。
client_id 為門鎖 ID,同門鎖連接至 EMQ X 使用的 MQTT Client ID。
方案介紹
Kafka 是由 Apache 軟件基金會開發的一個開源流處理平台,由 Scala 和 Java 編寫。該項目的目標是為處理實時數據提供一個統一、高吞吐、低延遲的平台。
kafka 有以下特性:
- 高吞吐量:吞吐量高達數十萬高並發,支持數千個客戶端同時讀寫;
- 低延遲:延遲最低只有幾毫秒,輕松構建實時流應用程序;
- 數據可靠性:將消息數據安全地分布式存儲,復制到容錯集群中,嚴格按照隊列順序處理,提供消息事務支持,保證數據完整性和消費可靠性;
- 集群容錯性:多節點副本中,允許 n-1 個節點失敗
- 可擴展性:支持集群動態擴展。
該方案中集成 Kafka 為 EMQ X 消息服務器與應用程序之間的消息傳遞提供消息隊列與消息總線。生產者(EMQ X)往隊列末尾添加數據,每個消費者(業務環節)依次讀取數據然后自行處理,這種架構兼顧了性能與數據可靠性,並有效降低系統復雜度、提升系統擴展性。該方案原型如下:
EMQ X Enterprise 安裝
安裝
如果您是 EMQ X 新手用戶,推薦通過 EMQ X 指南 快速上手
訪問 EMQ 官網 下載適合您操作系統的安裝包,由於數據持久化是企業功能,您需要下載 EMQ X 企業版(可以申請 License 試用) 寫本文的時候 EMQ X 企業版最新版本為 v3.4.4,下載 zip 包的啟動步驟如下 :
## 解壓下載好的安裝包
unzip emqx-ee-macosx-v3.4.4.zip
cd emqx
## 將 License 文件復制到 EMQ X 指定目錄 etc/, License 需自行申請試用或通過購買授權獲取
cp ../emqx.lic ./etc
## 以 console 模式啟動 EMQ X
./bin/emqx console
修改配置
本文中需要用到的配置文件如下:
- License 文件,EMQ X 企業版 License 文件,使用可用的 License 覆蓋:
etc/emqx.lic
- EMQ X Kafka 消息存儲插件配置文件,用於配置 Kafka 連接信息、數據橋接主題:
etc/plugins/emqx_bridge_kafka.conf
根據部署實際情況填寫插件配置信息如下,其余配置項請熟讀配置文件做出調整或直接使用默認配置即可:
## 連接地址
bridge.kafka.servers = 127.0.0.1:9092
## 需要處理的 Hooks 由於我們使用 QoS 1 的進行消息傳送,可以使用 ack hooks
## 注釋其他無關事件、消息 Hooks
## bridge.kafka.hook.client.connected.1 = {"topic":"client_connected"}
## bridge.kafka.hook.client.disconnected.1 = {"topic":"client_disconnected"}
## bridge.kafka.hook.session.subscribed.1 = {"filter":"#", "topic":"session_subscribed"}
## bridge.kafka.hook.session.unsubscribed.1 = {"filter":"#", "topic":"session_unsubscribed"}
## bridge.kafka.hook.message.deliver.1 = {"filter":"#", "topic":"message_deliver"}
## filter 為需要處理的 MQTT 主題, topoc 為寫入的 Kafka 主題
## 注冊多個 Hooks 實現上行、下行消息處理
## 上報指令選擇 publish hooks
bridge.kafka.hook.message.publish.1 = {"filter":"devices/+/state", "topic":"message_state"}
## 下發指令選擇 acked hooks,確保消息抵達才入庫
bridge.kafka.hook.message.acked.1 = {"filter":"devices/+/command", "topic":"message_command"}
Kafka 安裝與初始化
通過 Docker 進行安裝 Kafka,映射數據 9092
端口供連接使用,Kafka 依賴 Zookeeper,下面提供完整安裝命令:
## 安裝 Zookeeper
docker run -d --name zookeeper -p 2181 -t wurstmeister/zookeeper
## 安裝並配置 Kafka
docker run -d --name kafka --publish 9092:9092 \
--link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 \
--env KAFKA_ADVERTISED_HOST_NAME=127.0.0.1 \
--env KAFKA_ADVERTISED_PORT=9092 \
wurstmeister/kafka:latest
預先在 Kafka 創建需要使用的主題:
## 進入 Kafka Docker 容器
docker exec -it kafka bash
## 上行數據主題 message_state
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic message_state
## 下行數據主題 message_command
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic message_command
至此,可以重啟 EMQ X 並啟動插件以應用以上配置:
./bin/emqx stop
./bin/emqx start
## 或使用 console 模式可以看到更多信息
./bin/emqx console
## 啟動插件
./bin/emqx_ctl plugins load emqx_bridge_kafka
## 啟動成功后會有以下提示
Plugin load emqx_bridge_kafka loaded successfully.
模擬測試
使用 kafka-console-consumer 啟動消費
該方案中三個業務環節詳細實現本文不再贅述,本文僅需保證消息寫入 Kafka 即可,可以使用 Kafka 自帶的消費命令查看主題內的數據:
## 進入 Kafka Docker 容器
docker exec -it kafka bash
## 上行數據主題
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic message_state --from-beginning
## 開啟另外一個窗口查看下行數據主題
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic message_command --from-beginning
命令成功執行后將阻塞等待消費該主題的數據,我們繼續后續操作。
模擬測試數據收發
通過 EMQ X 管理控制台中的 WebSocket 工具可以模擬智能門鎖上/下行業務數據。瀏覽器打開 http://127.0.0.1:1883
進入 EMQ X 管理控制台,打開 Tool -> WebSocket 功能,輸入連接信息建立 MQTT 連接模擬門鎖設備。連接信息里 Client ID 根據業務指定,本文使用 10083618796833171
。
訂閱下行控制主題
根據業務需求,需訂閱門鎖專屬下行控制主題 devices/{client_id}/command
,此處需訂閱 devices/10083618796833171/command
主題並設置 QoS = 1:
模擬下發指令
向門鎖控制主題 devices/{client_id}/command
發送開鎖指令,此處下發數據為:
-
主題:
devices/10083618796833171/command
-
QoS:1
-
payload:
{ "process_id": "7802441525528958", "action": "unlock", "nonce_str": "u7u4p0n8", "ts": 1574744434, "sign": "e9f5af7deaa28563" }
下發成功后管理控制台 Publish 界面可以收到一條消息:
同時 Kafka message_command
主題消費者將收到一條或多條消息(EMQ X ack hooks 觸發次數以實際收到消息客戶端數量為准),消息為 JSON 格式,內容經格式化后如下:
{
"client_id": "10083618796833171",
"username": "",
"from": "10083618796833171",
"topic": "devices/10083618796833171/command",
"payload": "eyAgICJwcm9jZXNzX2lkIjogIjc4MDI0NDE1MjU1Mjg5NTgiLCAgICJhY3Rpb24iOiAidW5sb2NrIiwgICAibm9uY2Vfc3RyIjogInU3dTRwMG44IiwgICAidHMiOiAxNTc0NzQ0NDM0LCAgICJzaWduIjogImU5ZjVhZjdkZWFhMjg1NjMiIH0=",
"qos": 1,
"node": "emqx@127.0.0.1",
"ts": 1574751635845
}
該條消息包含了 MQTT 接收/發布客戶端信息與 Base64 編碼后的 Payload 數據:
- client_id: 接收客戶端 client_id
- username: 接受客戶端 username
- from: 發布客戶端 client_id
- topic: 消息發布目標主題
- payload: 經 Base64 編碼后的消息 Payload
- qos: 消息 QoS
- node: 消息處理節點
- ts: hooks 毫秒級觸發時間戳
模擬上報狀態
向門鎖控制主題 devices/{client_id}/state
發送狀態數據,此處發布數據為:
-
主題:
devices/10083618796833171/state
-
QoS:1
-
payload:
{ "process_id": "7802441525528958", "action": "unlock", "battery": 83.4, "lock_state": 1, "version": 1.1, "client_id": "10083618796833171" }
上報成功后 Kafka message_state
消費者將收到一條消息(EMQ X publish hooks 觸發次數與發布消息有關,與消息主題是否被訂閱以及訂閱數量無關),消息為 JSON 格式,內容經格式化后如下:
{
"client_id": "10083618796833171",
"username": "",
"topic": "devices/10083618796833171/state",
"payload": "eyAgICJwcm9jZXNzX2lkIjogIjc4MDI0NDE1MjU1Mjg5NTgiLCAgICJhY3Rpb24iOiAidW5sb2NrIiwgICAiYmF0dGVyeSI6IDgzLjQsICAgImxvY2tfc3RhdGUiOiAxLCAgICJ2ZXJzaW9uIjogMS4xLCAgICJjbGllbnRfaWQiOiAiMTAwODM2MTg3OTY4MzMxNzEiIH0=",
"qos": 1,
"node": "emqx@127.0.0.1",
"ts": 1574753026269
}
該條消息僅包含 MQTT 發布客戶端信息與 Base64 編碼后的 Payload 數據:
- client_id: 發布客戶端 client_id
- username:發布客戶端 username
- topic: 消息發布目標主題
- payload: 經 Base64 編碼后的消息 Payload
- qos: 消息 QoS
- node: 消息處理節點
- ts: hooks 毫秒級觸發時間戳
至此,我們成功完成 EMQ X 橋接消息至 Kafka 所有步驟,業務系統接入 Kafka 后可以根據消費到的消息數量、消息發布者/訂閱者的 client_id 以及消息 payload 內容進行業務判斷,實現所需業務功能。
性能測試
如果讀者對該方案的性能感興趣,可以采用 MQTT-JMeter 插件對其進行測試。需要注意的是,讀者需要在性能測試過程中保證做好 EMQ 集群、Kafka 集群、Kafka 的消費者,以及 JMeter 測試集群相關的優化與配置,才可以得到相關配置下正確的最佳性能測試結果。
總結
通過本文讀者可以了解到 EMQ X + Kafka 物聯網消息處理方案為消息通信與業務處理帶來的重要作用,利用該方案可以搭建松耦合、高性能、高容錯的物聯網消息處理平台,實現數據高效、安全地處理。
本文編碼實現具體的業務邏輯,讀者可以根據本文提供的業務原型與系統架構進行擴展。由於 RabbitMQ、RocketMQ、Pulsar 等 EMQ X 已經支持的消息/流處理中間的在物聯網項目中集成的架構思想與 Kafka 相近,讀者也可以以本文作為參考,根據自身技術棧自由選用相關組件進行方案集成。