EMQ X:規則引擎


簡介

EMQ X Rule Engine (以下簡稱規則引擎) 用於配置 EMQ X 消息流與設備事件的處理、響應規則。規則引擎不僅提供了清晰、靈活的 "配置式" 的業務集成方案,簡化了業務開發流程,提升用戶易用性,降低業務系統與 EMQ X 的耦合度;也為 EMQ X 的私有功能定制提供了一個更優秀的基礎架構。

image-20210730102724100

EMQ X 在 消息發布或事件觸發 時將觸發規則引擎,滿足觸發條件的規則將執行各自的 SQL 語句篩選並處理消息和事件的上下文信息。

消息發布

規則引擎借助響應動作可將特定主題的消息處理結果存儲到數據庫,發送到 HTTP Server,轉發到消息隊列 Kafka 或 RabbitMQ,重新發布到新的主題甚至是另一個 Broker 集群中,每個規則可以配置多個響應動作。

選擇發布到 t/# 主題的消息,並篩選出全部字段:

SELECT * FROM "t/#"

選擇發布到 t/a 主題的消息,並從 JSON 格式的消息內容中篩選出 "x" 字段:

SELECT payload.x as x FROM "t/a"

事件觸發

規則引擎使用 $events/ 開頭的虛擬主題(事件主題)處理 EMQ X 內置事件,內置事件提供更精細的消息控制和客戶端動作處理能力,可用在 QoS1 QoS2 的消息抵達記錄、設備上下線記錄等業務中。

選擇客戶端連接事件,篩選 Username 為 'emqx' 的設備並獲取連接信息:

SELECT clientid, connected_at FROM "$events/client_connected" WHERE username = 'emqx'

規則引擎典型應用場景舉例

  • 動作監聽:智慧家庭智能門鎖開發中,門鎖會因為網絡、電源故障、人為破壞等原因離線導致功能異常,使用規則引擎配置監聽離線事件向應用服務推送該故障信息,可以在接入層實現第一時間的故障檢測的能力;
  • 數據篩選:車輛網的卡車車隊管理,車輛傳感器采集並上報了大量運行數據,應用平台僅關注車速大於 40 km/h 時的數據,此場景下可以使用規則引擎對消息進行條件過濾,向業務消息隊列寫入滿足條件的數據;
  • 消息路由:智能計費應用中,終端設備通過不同主題區分業務類型,可通過配置規則引擎將計費業務的消息接入計費消息隊列並在消息抵達設備端后發送確認通知到業務系統,非計費信息接入其他消息隊列,實現業務消息路由配置;
  • 消息編解碼:其他公共協議 / 私有 TCP 協議接入、工控行業等應用場景下,可以通過規則引擎的本地處理函數(可在 EMQ X 上定制開發)做二進制 / 特殊格式消息體的編解碼工作;亦可通過規則引擎的消息路由將相關消息流向外部計算資源如函數計算進行處理(可由用戶自行開發處理邏輯),將消息轉為業務易於處理的 JSON 格式,簡化項目集成難度、提升應用快速開發交付能力。

規則引擎組成

使用 EMQ X 的規則引擎可以靈活地處理消息和事件。使用規則引擎可以方便地實現諸如將消息轉換成指定格式,然后存入數據庫表,或者發送到消息隊列等。

與 EMQ X 規則引擎相關的概念包括: 規則(rule)、動作(action)、資源(resource) 和 資源類型(resource-type)。

規則、動作、資源的關系:

規則: {
    SQL 語句,
    動作列表: [
        {
            動作1,
            動作參數,
            綁定資源: {
                資源配置
            }
        },
        {
            動作2,
            動作參數,
            綁定資源: {
                資源配置
            }
         }
    ]
}
  • 規則(Rule): 規則由 SQL 語句和動作列表組成。動作列表包含一個或多個動作及其參數。
  • SQL 語句用於篩選或轉換消息中的數據。
  • 動作(Action) 是 SQL 語句匹配通過之后,所執行的任務。動作定義了一個針對數據的操作。 動作可以綁定資源,也可以不綁定。例如,“inspect” 動作不需要綁定資源,它只是簡單打印數據內容和動作參數。而 “data_to_webserver” 動作需要綁定一個 web_hook 類型的資源,此資源中配置了 URL。
  • 資源(Resource): 資源是通過資源類型為模板實例化出來的對象,保存了與資源相關的配置(比如數據庫連接地址和端口、用戶名和密碼等) 和系統資源(如文件句柄,連接套接字等)。
  • 資源類型 (Resource Type): 資源類型是資源的靜態定義,描述了此類型資源需要的配置項。

動作和資源類型是由 emqx 或插件的代碼提供的,不能通過 API 和 CLI 動態創建。

SQL 語句

SQL 語法

FROM、SELECT 和 WHERE 子句:

規則引擎的 SQL 語句基本格式為:

SELECT <字段名> FROM <主題> [WHERE <條件>]
  • FROM 子句將規則掛載到某個主題上
  • SELECT 子句用於對數據進行變換,並選擇出感興趣的字段
  • WHERE 子句用於對 SELECT 選擇出來的某個字段施加條件過濾

FOREACH、DO 和 INCASE 子句:

如果對於一個數組數據,想針對數組中的每個元素分別執行一些操作並執行 Actions,需要使用 FOREACH-DO-INCASE 語法。其基本格式為:

FOREACH <字段名> [DO <條件>] [INCASE <條件>] FROM <主題> [WHERE <條件>]
  • FOREACH 子句用於選擇需要做 foreach 操作的字段,注意選擇出的字段必須為數組類型
  • DO 子句用於對 FOREACH 選擇出來的數組中的每個元素進行變換,並選擇出感興趣的字段
  • INCASE 子句用於對 DO 選擇出來的某個字段施加條件過濾

其中 DO 和 INCASE 子句都是可選的。DO 相當於針對當前循環中對象的 SELECT 子句,而 INCASE 相當於針對當前循環中對象的 WHERE 語句。

SQL 語句示例:

基本語法舉例

  • 從 topic 為 "t/a" 的消息中提取所有字段:
SELECT * FROM "t/a"
  • 從 topic 為 "t/a" 或 "t/b" 的消息中提取所有字段:
SELECT * FROM "t/a","t/b"
  • 從 topic 能夠匹配到 't/#' 的消息中提取所有字段。
SELECT * FROM "t/#"
  • 從 topic 能夠匹配到 't/#' 的消息中提取 qos, username 和 clientid 字段:
SELECT qos, username, clientid FROM "t/#"
  • 從任意 topic 的消息中提取 username 字段,並且篩選條件為 username = 'Steven':
SELECT username FROM "#" WHERE username='Steven'
  • 從任意 topic 的 JSON 消息體(payload) 中提取 x 字段,並創建別名 x 以便在 WHERE 子句中使用。WHERE 子句限定條件為 x = 1。下面這個 SQL 語句可以匹配到消息體 {"x": 1}, 但不能匹配到消息體 {"x": 2}:
SELECT payload as p FROM "#" WHERE p.x = 1
  • 類似於上面的 SQL 語句,但嵌套地提取消息體中的數據,下面的 SQL 語句可以匹配到 JSON 消息體 {"x": {"y": 1}}:
SELECT payload as a FROM "#" WHERE a.x.y = 1
  • 在 clientid = 'c1' 嘗試連接時,提取其來源 IP 地址和端口號:
SELECT peername as ip_port FROM "$events/client_connected" WHERE clientid = 'c1'
  • 篩選所有訂閱 't/#' 主題且訂閱級別為 QoS1 的 clientid:
SELECT clientid FROM "$events/session_subscribed" WHERE topic = 't/#' and qos = 1
  • 篩選所有訂閱主題能匹配到 't/#' 且訂閱級別為 QoS1 的 clientid。注意與上例不同的是,這里用的是主題匹配操作符 '=~',所以會匹配訂閱 't' 或 't/+/a' 的訂閱事件:
SELECT clientid FROM "$events/session_subscribed" WHERE topic =~ 't/#' and qos = 1
  • FROM 子句后面的主題需要用雙引號 "" 引起來。
  • WHERE 子句后面接篩選條件,如果使用到字符串需要用單引號 '' 引起來。
  • FROM 子句里如有多個主題,需要用逗號 "," 分隔。例如 SELECT * FROM "t/1", "t/2" 。
  • 可以使用使用 "." 符號對 payload 進行嵌套選擇。

遍歷語法(FOREACH-DO-INCASE) 舉例

假設有 ClientID 為 c_steve、主題為 t/1 的消息,消息體為 JSON 格式,其中 sensors 字段為包含多個 Object 的數組:

{
    "date": "2020-04-24",
    "sensors": [
        {"name": "a", "idx":0},
        {"name": "b", "idx":1},
        {"name": "c", "idx":2}
    ]
}

示例1: 要求將 sensors 里的各個對象,分別作為數據輸入重新發布消息到 sensors/${idx} 主題,內容為 ${name}。即最終規則引擎將會發出 3 條消息:

  1. 主題:sensors/0 內容:a
  2. 主題:sensors/1 內容:b
  3. 主題:sensors/2 內容:c

要完成這個規則,我們需要配置如下動作:

  • 動作類型:消息重新發布 (republish)
  • 目的主題:sensors/${idx}
  • 目的 QoS:0
  • 消息內容模板:${name}

以及如下 SQL 語句:

FOREACH
    payload.sensors
FROM "t/#"

示例解析:

這個 SQL 中,FOREACH 子句指定需要進行遍歷的數組 sensors,則選取結果為:

[
  {
    "name": "a",
    "idx": 0
  },
  {
    "name": "b",
    "idx": 1
  },
  {
    "name": "c",
    "idx": 2
  }
]

FOREACH 語句將會對於結果數組里的每個對象分別執行 "消息重新發布" 動作,所以將會執行重新發布動作 3 次。

示例2: 要求將 sensors 里的 idx 值大於或等於 1 的對象,分別作為數據輸入重新發布消息到 sensors/${idx} 主題,內容為 clientid=${clientid},name=${name},date=${date}。即最終規則引擎將會發出 2 條消息:

  1. 主題:sensors/1 內容:clientid=c_steve,name=b,date=2020-04-24
  2. 主題:sensors/2 內容:clientid=c_steve,name=c,date=2020-04-24

要完成這個規則,我們需要配置如下動作:

  • 動作類型:消息重新發布 (republish)
  • 目的主題:sensors/${idx}
  • 目的 QoS:0
  • 消息內容模板:clientid=${clientid},name=${name},date=${date}

以及如下 SQL 語句:

FOREACH
    payload.sensors
DO
    clientid,
    item.name as name,
    item.idx as idx
INCASE
    item.idx >= 1
FROM "t/#"

示例解析:

這個 SQL 中,FOREACH 子句指定需要進行遍歷的數組 sensors; DO 子句選取每次操作需要的字段,這里我們選了外層的 clientid 字段,以及當前 sensor 對象的 nameidx 兩個字段,注意 item 代表 sensors 數組中本次循環的對象。INCASE 子句是針對 DO 語句中字段的篩選條件,僅僅當 idx >= 1 滿足條件。所以 SQL 的選取結果為:

[
  {
    "name": "b",
    "idx": 1,
    "clientid": "c_emqx"
  },
  {
    "name": "c",
    "idx": 2,
    "clientid": "c_emqx"
  }
]

FOREACH 語句將會對於結果數組里的每個對象分別執行 "消息重新發布" 動作,所以將會執行重新發布動作 2 次。

在 DO 和 INCASE 語句里,可以使用 item 訪問當前循環的對象,也可以通過在 FOREACH 使用 as 語法自定義一個變量名。所以本例中的 SQL 語句又可以寫為:

FOREACH
    payload.sensors as s
DO
    clientid,
    s.name as name,
    s.idx as idx
INCASE
    s.idx >= 1
FROM "t/#"

示例3: 在示例2 的基礎上,去掉 clientid 字段 c_steve 中的 c_ 前綴

在 FOREACH 和 DO 語句中可以調用各類 SQL 函數,若要將 c_steve 變為 steve,則可以把例2 中的 SQL 改為:

FOREACH
    payload.sensors as s
DO
    nth(2, tokens(clientid,'_')) as clientid,
    s.name as name,
    s.idx as idx
INCASE
    s.idx >= 1
FROM "t/#"

另外,FOREACH 子句中也可以放多個表達式,只要最后一個表達式是指定要遍歷的數組即可。比如我們將消息體改一下,sensors 外面多套一層 Object:

{
    "date": "2020-04-24",
    "data": {
        "sensors": [
            {"name": "a", "idx":0},
            {"name": "b", "idx":1},
            {"name": "c", "idx":2}
        ]
    }
}

則 FOREACH 中可以在決定要遍歷的數組之前把 data 選取出來:

FOREACH
    payload.data as data
    data.sensors as s
...

CASE-WHEN 語法示例

示例1: 將消息中 x 字段的值范圍限定在 0~7 之間。

SELECT
  CASE WHEN payload.x < 0 THEN 0
       WHEN payload.x > 7 THEN 7
       ELSE payload.x
  END as x
FROM "t/#"

假設消息為:

{"x": 8}

則上面的 SQL 輸出為:

{"x": 7}

數組操作語法舉例

示例1: 創建一個數組,賦值給變量 a:

SELECT
  [1,2,3] as a
FROM
  "t/#"

下標從 1 開始,上面的 SQL 輸出為:

{
  "a": [1, 2, 3]
}

示例2: 從數組中取出第 N 個元素。下標為負數時,表示從數組的右邊取:

SELECT
  [1,2,3] as a,
  a[2] as b,
  a[-2] as c
FROM
  "t/#"

上面的 SQL 輸出為:

{
  "b": 2,
  "c": 2,
  "a": [1, 2, 3]
}

示例3: 從 JSON 格式的 payload 中嵌套的獲取值:

SELECT
  payload.data[1].id as id
FROM
  "t/#"

假設消息為:

{"data": [
    {"id": 1, "name": "steve"},
    {"id": 2, "name": "bill"}
]}

則上面的 SQL 輸出為:

{"id": 1}

示例4: 數組范圍(range)操作:

SELECT
  [1..5] as a,
  a[2..4] as b
FROM
  "t/#"

上面的 SQL 輸出為:

{
  "b": [2, 3, 4],
  "a": [1, 2, 3, 4, 5]
}

示例5: 使用下標語法修改數組中的某個元素:

SELECT
  payload,
  'STEVE' as payload.data[1].name
FROM
  "t/#"

假設消息為:

{"data": [
    {"id": 1, "name": "steve"},
    {"id": 2, "name": "bill"}
]}

則上面的 SQL 輸出為:

{
  "payload": {
    "data": [
      {"name": "STEVE", "id": 1},
      {"name": "bill", "id": 2}
    ]
  }
}

FROM 子句可用的事件主題

事件主題名 釋義
$events/message_delivered 消息投遞
$events/message_acked 消息確認
$events/message_dropped 消息丟棄
$events/client_connected 連接完成
$events/client_disconnected 連接斷開
$events/session_subscribed 訂閱
$events/session_unsubscribed 取消訂閱

SELECT 和 WHERE 子句可用的字段

SELECT 和 WHERE 子句可用的字段與事件的類型相關。其中 clientid, usernameevent 是通用字段,每種事件類型都有

普通主題 (消息發布)

event 事件類型,固定為 "message.publish"
id MQTT 消息 ID
clientid Client ID
username 用戶名
payload MQTT 消息體
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 消息的 QoS
flags MQTT 消息的 Flags
headers MQTT 消息內部與流程處理相關的額外數據
timestamp 事件觸發時間 (ms)
publish_received_at PUBLISH 消息到達 Broker 的時間 (ms)
node 事件觸發所在節點

$events/message_delivered (消息投遞)

event 事件類型,固定為 "message.delivered"
id MQTT 消息 ID
from_clientid 消息來源 Client ID
from_username 消息來源用戶名
clientid 消息目的 Client ID
username 消息目的用戶名
payload MQTT 消息體
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 消息的 QoS
flags MQTT 消息的 Flags
timestamp 事件觸發時間 (ms)
publish_received_at PUBLISH 消息到達 Broker 的時間 (ms)
node 事件觸發所在節點

$events/message_acked (消息確認)

event 事件類型,固定為 "message.acked"
id MQTT 消息 ID
from_clientid 消息來源 Client ID
from_username 消息來源用戶名
clientid 消息目的 Client ID
username 消息目的用戶名
payload MQTT 消息體
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 消息的 QoS
flags MQTT 消息的 Flags
timestamp 事件觸發時間 (ms)
publish_received_at PUBLISH 消息到達 Broker 的時間 (ms)
node 事件觸發所在節點

$events/message_dropped (消息丟棄)

event 事件類型,固定為 "message.dropped"
id MQTT 消息 ID
reason 消息丟棄原因
clientid 消息目的 Client ID
username 消息目的用戶名
payload MQTT 消息體
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 消息的 QoS
flags MQTT 消息的 Flags
timestamp 事件觸發時間 (ms)
publish_received_at PUBLISH 消息到達 Broker 的時間 (ms)
node 事件觸發所在節點

$events/client_connected (終端連接成功)

event 事件類型,固定為 "client.connected"
clientid 消息目的 Client ID
username 消息目的用戶名
mountpoint 主題掛載點(主題前綴)
peername 終端的 IPAddress 和 Port
sockname emqx 監聽的 IPAddress 和 Port
proto_name 協議名字
proto_ver 協議版本
keepalive MQTT 保活間隔
clean_start MQTT clean_start
expiry_interval MQTT Session 過期時間
is_bridge 是否為 MQTT bridge 連接
connected_at 終端連接完成時間 (s)
timestamp 事件觸發時間 (ms)
node 事件觸發所在節點

$events/client_disconnected (終端連接斷開)

event 事件類型,固定為 "client.disconnected"
reason 終端連接斷開原因: normal:客戶端主動斷開 kicked:服務端踢出,通過 REST API keepalive_timeout: keepalive 超時 not_authorized: 認證失敗,或者 acl_nomatch = disconnect 時沒有權限的 Pub/Sub 會主動斷開客戶端 tcp_closed: 協議錯誤 internal_error: 畸形報文解析出錯
clientid 消息目的 Client ID
username 消息目的用戶名
peername 終端的 IPAddress 和 Port
sockname emqx 監聽的 IPAddress 和 Port
disconnected_at 終端連接斷開時間 (s)
timestamp 事件觸發時間 (ms)
node 事件觸發所在節點

$events/session_subscribed (終端訂閱成功)

event 事件類型,固定為 "session.subscribed"
clientid 消息目的 Client ID
username 消息目的用戶名
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 消息的 QoS
timestamp 事件觸發時間 (ms)
node 事件觸發所在節點

$events/session_unsubscribed (取消終端訂閱成功)

event 事件類型,固定為 "session.unsubscribed"
clientid 消息目的 Client ID
username 消息目的用戶名
peerhost 客戶端的 IPAddress
topic MQTT 主題
qos MQTT 消息的 QoS
timestamp 事件觸發時間 (ms)
node 事件觸發所在節點

SQL 關鍵字和符號

SELECT - FROM - WHERE 語句

SELECT 語句用於決定最終的輸出結果里的字段。比如:

下面 SQL 的輸出結果中將只有兩個字段 "a" 和 "b":

SELECT a, b FROM "t/#"

WHERE 語句用於對本事件中可用字段,或 SELECT 語句中定義的字段進行條件過濾。比如:

# 選取 username 為 'abc' 的終端發來的消息,輸出結果為所有可用字段:

SELECT * FROM "#" WHERE username = 'abc'

## 選取 clientid 為 'abc' 的終端發來的消息,輸出結果將只有 cid 一個字段。
## 注意 cid 變量是在 SELECT 語句中定義的,故可在 WHERE 語句中使用:

SELECT clientid as cid FROM "#" WHERE cid = 'abc'

## 選取 username 為 'abc' 的終端發來的消息,輸出結果將只有 cid 一個字段。
## 注意雖然 SELECT 語句中只選取了 cid 一個字段,所有消息發布事件中的可用字段 (比如 clientid, username 等) 仍然可以在 WHERE 語句中使用:

SELECT clientid as cid FROM "#" WHERE username = 'abc'

## 但下面這個 SQL 語句就不能工作了,因為變量 xyz 既不是消息發布事件中的可用字段,又沒有在 SELECT 語句中定義:

SELECT clientid as cid FROM "#" WHERE xyz = 'abc'

FROM 語句用於選擇事件來源。如果是消息發布則填寫消息的主題,如果是事件則填寫對應的事件主題。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM