配置實例
本篇提供兩個示例,通過 Dashboard 可視化界面演示規則引擎的創建於使用。
示例一:通過 Web Server 持久化消息到磁盤/數據庫
場景描述
該場景中擬設車聯網卡車車載傳感器通過 /monitor/:device_id/state
主題上報如下 JSON 消息(device_id 為車輛連接客戶端的 client_id,同車輛 ID):
{
"speed": 20, // 實時車速(千米/小時)
"lng": 102.8622543812, // 位置經度
"lat": 24.8614503916, // 位置緯度
"load": 1200101 // 載重量(千克)
}
規則引擎需要將車速大於 60 km/h 的數據發送到 Web Server 進行持久化處理,以便后期結合地理位置進行是否超速判定。
使用 Web Server 持久化設備消息從吞吐性能與消息一致性上考量都略顯不足,此處僅為規則引擎體驗示例,如有相關場景請嘗試數據橋接、直接持久化到數據庫等方案。
准備
編寫 HTTP 接口,准備接收並處理規則引擎的消息
該部分示例代碼如下:
'use strict'
const http = require('http')
const execSync = require('child_process').execSync
// 初始化全局變量用於計數
let msg_num = 0
http.createServer((req, res) => {
const { token } = req.headers
console.log('message coming', 'token:', token)
// 簡單的認證
if (!token || token !== 'web_token') {
return res.end('-1')
}
let body = ''
req.on('data', (data) => {
body = body + data
})
req.on('end', () => {
body = body.toString()
try {
const message = JSON.parse(body)
// 附加時間戳
/** @type {number} */
message.ts = Date.now()
message.index = msg_num
// 持久化數據到磁盤,實際根據業務處理
execSync(`echo '${JSON.stringify(message)}' >> message.log`)
msg_num = msg_num + 1
res.end(msg_num.toString())
} catch (e) {
res.end('-1')
}
})
}).listen(8888, () => {
console.log('Listen on 8888')
}) // 監聽 8888 端口
本地啟動服務
使用 Node.js 快速在本地啟動服務器
node app.js
> Listen on 8888
此處使用依賴極簡代碼示例,實際開發中應當有完備的權限校驗、數據校驗操作。
在資源中創建持久化 API 接口
在 Dashboard --> 規則引擎 --> 資源 頁面點擊右上角,點擊 新建 按鈕,選擇 WebHook 資源類型,填入接入地址與認證信息:
創建規則
資源創建完畢后我們可以進行規則創建,規則引擎 --> 規則 頁面中點擊 新建 按鈕進入規則創建頁面。
觸發事件選擇
選擇 消息發布 事件,處理卡車消息上報(發布)時的數據。本示例中我們需要存儲的消息如下:
{
"speed": 20, // 實時車速(千米/小時)
"lng": 102.8622543812, // 位置經度
"lat": 24.8614503916, // 位置緯度
"device_id": "" // 車輛 ID 信息
}
根據 可用字段 提示,device_id
字段相當於 client_id 可以從上下文中選取,speed
等信息則從 payload
中選取,規則 SQL 如下:
SELECT
payload.speed AS speed,
payload.lng AS lng,
payload.lat AS lat,
client_id AS device_id
FROM "message.publish"
該條規則默認處理全部的消息,實際上業務僅需處理 /monitor/+/state
主題下的消息(使用了主題通配符),且 speed
的值應當大於 60,我們給規則加上限定條件:
SELECT
payload.speed AS speed,
payload.lng AS lng,
payload.lat AS lat,
client_id AS device_id
FROM "message.publish"
WHERE
topic =~ '/monitor/+/state' and
speed > 60
使用 SQL 測試功能,輸入原始上報數據與相關變量,設置 speed > 60
之后,得到如下輸出結果:
{
"speed": 89,
"lng": 102.8622543812,
"lat": 24.8614503916,
"device_id": "emqx_c"
}
-
將消息發送到 Web Server
新建響應動作並選取 發送數據到 Web 服務,選擇准備工作中創建的資源,保存該條規則。
示例測試
我們成功創建了一條規則,一共包含一個處理動作,動作期望效果如下:
- 向
/monitor/+/state
主題發布消息時,當消息體是符合預期的 JSON 格式且speed
數值大於 60,規則將命中並向 Web Server 處理后的消息,Web Server 根目錄下message.log
文件將新增新增寫入該條數據。
- 向
使用 Dashboard 中的 Websocket 工具測試
切換到 工具 --> Websocket 頁面,客戶端 ID,用戶名,密碼均填寫 emqx_c
模擬設備接入:
連接成功后向 /monitor/emqx_c/state
主題發送如下消息:
{
"speed": 20,
"lng": 102.8622543812,
"lat": 24.8614503916,
"load": 1200101
}
由於 speed
小於預設的 60,查看持久化文件 message.log
該條消息並未命中規則。
調整 speed
值為 90,單擊發送按鈕三次,查看文件 message.log
中持久化的消息內容如下:
{"speed":90,"lng":102.8622543812,"lat":24.8614503916,"device_id":"emqx_c","ts":1559711462746,"index":0}
{"speed":90,"lng":102.8622543812,"lat":24.8614503916,"device_id":"emqx_c","ts":1559711474487,"index":1}
{"speed":90,"lng":102.8622543812,"lat":24.8614503916,"device_id":"emqx_c","ts":1559711475219,"index":2}
至此,我們實現了通過 Web Server 持久化消息到磁盤的業務開發。
示例二:設備在線狀態記錄與上下線通知
場景描述
該場景中需要標記接入 EMQ X 的設備在線狀態,在 MySQL 中記錄設備上下線日志,同時設備下線時通過 HTTP API 通知告警系統。
MySQL 部分功能僅限企業版
准備
初始化 MySQL 設備表 devices
與 連接記錄表 device_connect_log
-- 設備表
CREATE TABLE `emqx`.`devices` (
`id` INT NOT NULL,
`client_id` VARCHAR(255) NOT NULL AUTO_INCREMENT COMMENT '客戶端 ID',
`state` TINYINT(3) NOT NULL DEFAULT 0 COMMENT '狀態 0 離線 1 在線',
`connected_at` VARCHAR(45) NULL COMMENT '連接時間,毫秒級時間戳',
PRIMARY KEY (`id`));
-- 初始化數據
INSERT INTO `emqx`.`devices` (`client_id`) VALUES ('emqx_c');
-- 連接記錄表
CREATE TABLE `emqx`.`device_connect_log` (
`id` INT NOT NULL,
`client_id` VARCHAR(255) NOT NULL AUTO_INCREMENT COMMENT '客戶端 ID',
`action` TINYINT(3) NOT NULL DEFAULT 0 COMMENT '動作 0 其他 1 上線 2 下線 3 訂閱 4 取消訂閱',
`target` VARCHAR(255) NULL COMMENT '操作目標',
`create_at` VARCHAR(45) NULL COMMENT '記錄時間',
PRIMARY KEY (`id`));
在資源中創建 MySQL 連接
在 Dashboard --> 規則引擎 --> 資源 頁面點擊右上角,點擊 新建 按鈕,選擇 MySQL 資源類型,填入相關參數創建 MySQL 連接資源,保存配置前可點擊 測試連接 進行可用性測試:
在資源中創建告警 API 接口
重復資源創建操作,創建 WehHook 類型的資源用於設備下線通知。此處用戶可根據業務邏輯自行開發告警服務:
創建規則
資源創建完畢后我們可以進行規則創建,規則引擎 --> 規則 頁面中點擊 新建 按鈕進入規則創建頁面。
觸發事件選擇
設備上下、線對應的事件分別是 連接完成 與 連接斷開,首先選擇 連接完成 事件進行上線記錄:
創建上線處理規則
SQL 測試與動作創建:
通過界面上的 可用字段 提示,編寫規則 SQL 語句選取 client_id
與 connected_at
如下:
SELECT client_id, connected_at FROM "client.connected"
點擊 SQL 測試進行 SQL 輸出測試,該條 SQL 執行輸出為:
{
"client_id": "c_emqx",
"connected_at": 1559639502861
}
即響應動作中將拿到上述數據。
新建響應動作並選取 保存數據到 MySQL,選擇准備工作中創建的 MySQL 資源,輸入 SQL 模板 配置該條數據寫入規則,使用類似 ${x}
的魔法變量可以將規則篩選出來的數據替換進 SQL 語句。
根據 client_id
更新設備的 state
為 1,表示設備在線
UPDATE `devices`
SET `state`=1, `connected_at`= ${connected_at}
WHERE `client_id`= ${client_id}
LIMIT 1
再添加一個動作,在設備連接表 中插入一條記錄,記錄設備上線歷史:
INSERT INTO `device_connect_log`
(`client_id`, `action`, `create_at`)
VALUES (${client_id}, '1', ${connected_at});
點擊 新建 完成規則的創建,該條規則包含兩個動作。
創建離線處理規則
上一步中我們已經通過 連接完成 觸發事件完成了設備上線規則的創建,接下來我們完成設備下線規則創建:
觸發事件選擇 連接斷開 ,同樣將 client_id
與 connected_at
選擇出來,規則 SQL 如下:
SELECT client_id, reason_code FROM "client.disconnected"
點擊 SQL 測試進行 SQL 輸出測試,該條 SQL 執行輸出為:
{
"client_id": "c_emqx",
"reason_code": "normal"
}
將設備狀態置為離線並清空上線時間:
新增一個響應動作,選擇 保存數據到 MySQL 並編寫如下 SQL 模板 :
UPDATE `devices`
SET `state`=0, `connected_at`= ''
WHERE `client_id`= ${client_id}
LIMIT 1
設備連接表 中插入一條記錄,記錄設備下線歷史:
繼續新增一個響應動作,這里復用 target
字段,標記下線原因
INSERT INTO `device_connect_log`
(`client_id`, `action`, `target`)
VALUES (${client_id}, '2', ${reason_code});
將下線消息發送到 Web Server,觸發業務系統的設備下線通知:**
將下線消息發送到 Web Server,觸發業務系統的設備下線通知:
新增一個 發送數據到 Web 服務 動作,選擇 准備 步驟中創建的 Web 接入點,消息將以 HTTP 請求發送到該接入點。
點擊 新建 完成規則的創建,該條規則包含三個動作。
示例測試
- 我們成功創建了兩條規則,一共包含五個處理動作,動作期望效果如下:
- 設備上線時,更改數據庫
設備表
的state
字段 為1
,標記設備在線; - 設備上線時,在
連接記錄表
插入一條上線記錄,包含client_id
與create_at
字段,同時設置action
為1
標記這是一條上線記錄; - 設備下線時,更改數據庫
設備表
的state
字段 為0
,標記設備離線; - 設備下線時,在
連接記錄表
插入一條下線記錄,包含client_id
與target
字段(標記下線原因),同時設置action
為2
標記這是一條下線記錄; - 設備下線時,發送一條請求到
https://api.emqx.io/v1/connect_hook
服務網關,網關獲取到下線設備的 client_id 與下線原因,做出相應邏輯通知到業務系統。
- 設備上線時,更改數據庫
使用 Dashboard 中的 Websocket 工具測試
切換到 工具 --> Websocket 頁面,客戶端 ID,用戶名,密碼均填寫 emqx_c
模擬設備接入:
連接成功后,分別查看 設備表 與 連接記錄表 得到以下數據:
設備狀態已被更新,連接記錄表新增一條數據
手動斷開連接,數據表中數據如下:
設備狀態已被更新,連接記錄表新增一條離線數據,告警 API 接口應當收到了設備離線數據,此處不再贅述。
至此,我們通過兩條規則實現了預定的在線狀態切換,上下線記錄與下線告警相關業務開發。