規則引擎使用配置介紹


配置實例

本篇提供兩個示例,通過 Dashboard 可視化界面演示規則引擎的創建於使用。

示例一:通過 Web Server 持久化消息到磁盤/數據庫

場景描述

該場景中擬設車聯網卡車車載傳感器通過 /monitor/:device_id/state 主題上報如下 JSON 消息(device_id 為車輛連接客戶端的 client_id,同車輛 ID):

{
"speed": 20, // 實時車速(千米/小時)
"lng": 102.8622543812, // 位置經度
"lat": 24.8614503916, // 位置緯度
"load": 1200101 // 載重量(千克)
}

規則引擎需要將車速大於 60 km/h 的數據發送到 Web Server 進行持久化處理,以便后期結合地理位置進行是否超速判定。

使用 Web Server 持久化設備消息從吞吐性能與消息一致性上考量都略顯不足,此處僅為規則引擎體驗示例,如有相關場景請嘗試數據橋接、直接持久化到數據庫等方案。

准備

編寫 HTTP 接口,准備接收並處理規則引擎的消息

該部分示例代碼如下:

'use strict'
const http = require('http')
const execSync = require('child_process').execSync
// 初始化全局變量用於計數
let msg_num = 0
http.createServer((req, res) => {

const { token } = req.headers
console.log('message coming', 'token:', token)
// 簡單的認證
if (!token || token !== 'web_token') {
  return res.end('-1')
}
let body = ''
req.on('data', (data) => {
  body = body + data
})
req.on('end', () => {
  body = body.toString()
  try {
    const message = JSON.parse(body)
    // 附加時間戳
    /** @type {number} */
    message.ts = Date.now()
    message.index = msg_num
    // 持久化數據到磁盤,實際根據業務處理
    execSync(`echo '${JSON.stringify(message)}' >> message.log`)
    msg_num = msg_num + 1
    res.end(msg_num.toString())
  } catch (e) {
    res.end('-1')
  }
})
}).listen(8888, () => {
console.log('Listen on 8888')
}) // 監聽 8888 端口

本地啟動服務

使用 Node.js 快速在本地啟動服務器

node app.js
> Listen on 8888

此處使用依賴極簡代碼示例,實際開發中應當有完備的權限校驗、數據校驗操作。

在資源中創建持久化 API 接口

Dashboard --> 規則引擎 --> 資源 頁面點擊右上角,點擊 新建 按鈕,選擇 WebHook 資源類型,填入接入地址與認證信息:

創建規則

資源創建完畢后我們可以進行規則創建,規則引擎 --> 規則 頁面中點擊 新建 按鈕進入規則創建頁面。

觸發事件選擇

選擇 消息發布 事件,處理卡車消息上報(發布)時的數據。本示例中我們需要存儲的消息如下:

{
"speed": 20, // 實時車速(千米/小時)
"lng": 102.8622543812, // 位置經度
"lat": 24.8614503916, // 位置緯度
"device_id": "" // 車輛 ID 信息
}

根據 可用字段 提示,device_id 字段相當於 client_id 可以從上下文中選取,speed 等信息則從 payload 中選取,規則 SQL 如下:

SELECT 
payload.speed AS speed, 
payload.lng AS lng, 
payload.lat AS lat, 
client_id AS device_id 
FROM "message.publish"

該條規則默認處理全部的消息,實際上業務僅需處理 /monitor/+/state 主題下的消息(使用了主題通配符),且 speed 的值應當大於 60,我們給規則加上限定條件:

SELECT 
payload.speed AS speed, 
payload.lng AS lng, 
payload.lat AS lat, 
client_id AS device_id 
FROM "message.publish"
WHERE
topic =~ '/monitor/+/state' and
speed > 60

使用 SQL 測試功能,輸入原始上報數據與相關變量,設置 speed > 60 之后,得到如下輸出結果:

{
"speed": 89,
"lng": 102.8622543812,
"lat": 24.8614503916,
"device_id": "emqx_c"
}
  • 將消息發送到 Web Server

    新建響應動作並選取 發送數據到 Web 服務,選擇准備工作中創建的資源,保存該條規則。

    示例測試

    我們成功創建了一條規則,一共包含一個處理動作,動作期望效果如下:

    • /monitor/+/state 主題發布消息時,當消息體是符合預期的 JSON 格式且 speed 數值大於 60,規則將命中並向 Web Server 處理后的消息,Web Server 根目錄下 message.log 文件將新增新增寫入該條數據。

使用 Dashboard 中的 Websocket 工具測試

切換到 工具 --> Websocket 頁面,客戶端 ID,用戶名,密碼均填寫 emqx_c 模擬設備接入:

連接成功后向 /monitor/emqx_c/state 主題發送如下消息:

{
"speed": 20,
"lng": 102.8622543812,
"lat": 24.8614503916,
"load": 1200101
}

由於 speed 小於預設的 60,查看持久化文件 message.log 該條消息並未命中規則。

調整 speed 值為 90,單擊發送按鈕三次,查看文件 message.log 中持久化的消息內容如下:

{"speed":90,"lng":102.8622543812,"lat":24.8614503916,"device_id":"emqx_c","ts":1559711462746,"index":0}
{"speed":90,"lng":102.8622543812,"lat":24.8614503916,"device_id":"emqx_c","ts":1559711474487,"index":1}
{"speed":90,"lng":102.8622543812,"lat":24.8614503916,"device_id":"emqx_c","ts":1559711475219,"index":2}

至此,我們實現了通過 Web Server 持久化消息到磁盤的業務開發。

示例二:設備在線狀態記錄與上下線通知

場景描述

該場景中需要標記接入 EMQ X 的設備在線狀態,在 MySQL 中記錄設備上下線日志,同時設備下線時通過 HTTP API 通知告警系統。

MySQL 部分功能僅限企業版

准備

初始化 MySQL 設備表 devices 與 連接記錄表 device_connect_log

-- 設備表
CREATE TABLE `emqx`.`devices` (
`id` INT NOT NULL,
`client_id` VARCHAR(255) NOT NULL AUTO_INCREMENT COMMENT '客戶端 ID',
`state` TINYINT(3) NOT NULL DEFAULT 0 COMMENT '狀態 0 離線 1 在線',
`connected_at` VARCHAR(45) NULL COMMENT '連接時間,毫秒級時間戳',
PRIMARY KEY (`id`));

-- 初始化數據

INSERT INTO `emqx`.`devices` (`client_id`) VALUES ('emqx_c');
-- 連接記錄表
CREATE TABLE `emqx`.`device_connect_log` (
`id` INT NOT NULL,
`client_id` VARCHAR(255) NOT NULL AUTO_INCREMENT COMMENT '客戶端 ID',
`action` TINYINT(3) NOT NULL DEFAULT 0 COMMENT '動作 0 其他 1 上線 2 下線 3 訂閱 4 取消訂閱',
`target` VARCHAR(255) NULL COMMENT '操作目標',
`create_at` VARCHAR(45) NULL COMMENT '記錄時間',
PRIMARY KEY (`id`));

在資源中創建 MySQL 連接

Dashboard --> 規則引擎 --> 資源 頁面點擊右上角,點擊 新建 按鈕,選擇 MySQL 資源類型,填入相關參數創建 MySQL 連接資源,保存配置前可點擊 測試連接 進行可用性測試:

在資源中創建告警 API 接口

重復資源創建操作,創建 WehHook 類型的資源用於設備下線通知。此處用戶可根據業務邏輯自行開發告警服務:

創建規則

資源創建完畢后我們可以進行規則創建,規則引擎 --> 規則 頁面中點擊 新建 按鈕進入規則創建頁面。

觸發事件選擇

設備上下、線對應的事件分別是 連接完成連接斷開,首先選擇 連接完成 事件進行上線記錄:

創建上線處理規則

SQL 測試與動作創建:

通過界面上的 可用字段 提示,編寫規則 SQL 語句選取 client_idconnected_at 如下:

SELECT client_id, connected_at FROM "client.connected"

點擊 SQL 測試進行 SQL 輸出測試,該條 SQL 執行輸出為:

{
"client_id": "c_emqx",
"connected_at": 1559639502861
}

即響應動作中將拿到上述數據。

新建響應動作並選取 保存數據到 MySQL,選擇准備工作中創建的 MySQL 資源,輸入 SQL 模板 配置該條數據寫入規則,使用類似 ${x} 的魔法變量可以將規則篩選出來的數據替換進 SQL 語句。

根據 client_id 更新設備的 state 為 1,表示設備在線

UPDATE `devices` 
SET `state`=1, `connected_at`= ${connected_at} 
WHERE `client_id`= ${client_id}
LIMIT 1

再添加一個動作,在設備連接表 中插入一條記錄,記錄設備上線歷史:

INSERT INTO `device_connect_log` 
(`client_id`, `action`, `create_at`) 
VALUES (${client_id}, '1', ${connected_at});

點擊 新建 完成規則的創建,該條規則包含兩個動作。

創建離線處理規則

上一步中我們已經通過 連接完成 觸發事件完成了設備上線規則的創建,接下來我們完成設備下線規則創建:

觸發事件選擇 連接斷開 ,同樣將 client_idconnected_at 選擇出來,規則 SQL 如下:

SELECT client_id, reason_code FROM "client.disconnected"

點擊 SQL 測試進行 SQL 輸出測試,該條 SQL 執行輸出為:

{
"client_id": "c_emqx",
"reason_code": "normal"
}

將設備狀態置為離線並清空上線時間:

新增一個響應動作,選擇 保存數據到 MySQL 並編寫如下 SQL 模板 :

UPDATE `devices` 
SET `state`=0, `connected_at`= '' 
WHERE `client_id`= ${client_id}
LIMIT 1

設備連接表 中插入一條記錄,記錄設備下線歷史:

繼續新增一個響應動作,這里復用 target 字段,標記下線原因

INSERT INTO `device_connect_log` 
(`client_id`, `action`, `target`) 
VALUES (${client_id}, '2', ${reason_code});

將下線消息發送到 Web Server,觸發業務系統的設備下線通知:**

將下線消息發送到 Web Server,觸發業務系統的設備下線通知:

新增一個 發送數據到 Web 服務 動作,選擇 准備 步驟中創建的 Web 接入點,消息將以 HTTP 請求發送到該接入點。

點擊 新建 完成規則的創建,該條規則包含三個動作。

示例測試

  1. 我們成功創建了兩條規則,一共包含五個處理動作,動作期望效果如下:
    1. 設備上線時,更改數據庫 設備表state 字段 為 1,標記設備在線;
    2. 設備上線時,在 連接記錄表 插入一條上線記錄,包含 client_idcreate_at 字段,同時設置 action1 標記這是一條上線記錄;
    3. 設備下線時,更改數據庫 設備表state 字段 為 0,標記設備離線;
    4. 設備下線時,在 連接記錄表 插入一條下線記錄,包含 client_idtarget 字段(標記下線原因),同時設置 action2 標記這是一條下線記錄;
    5. 設備下線時,發送一條請求到 https://api.emqx.io/v1/connect_hook 服務網關,網關獲取到下線設備的 client_id 與下線原因,做出相應邏輯通知到業務系統。

使用 Dashboard 中的 Websocket 工具測試

切換到 工具 --> Websocket 頁面,客戶端 ID,用戶名,密碼均填寫 emqx_c 模擬設備接入:

連接成功后,分別查看 設備表 與 連接記錄表 得到以下數據:

設備狀態已被更新,連接記錄表新增一條數據

手動斷開連接,數據表中數據如下:

設備狀態已被更新,連接記錄表新增一條離線數據,告警 API 接口應當收到了設備離線數據,此處不再贅述。


至此,我們通過兩條規則實現了預定的在線狀態切換,上下線記錄與下線告警相關業務開發。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM