WebHook
WebHook 是由 emqx_web_hook插件提供的 將 EMQ X 中的鈎子事件通知到某個 Web 服務 的功能。
WebHook 的內部實現是基於鈎子,但它更靠近頂層一些。它通過在鈎子上的掛載回調函數,獲取到 EMQ X 中的各種事件,並轉發至 emqx_web_hook 中配置的 Web 服務器。
以 客戶端成功接入(client.connected) 事件為例,其事件的傳遞流程如下:
Client | EMQ X | emqx_web_hook | HTTP +------------+
=============>| - - - - - - -> - - - - - - - ->===========> | Web Server |
| Broker | | Request +------------+
WebHook 對於事件的處理是單向的,它僅支持將 EMQ X 中的事件推送給 Web 服務,並不關心 Web 服務的返回。 借助 Webhook 可以完成設備在線、上下線記錄,訂閱與消息存儲、消息送達確認等諸多業務。
簡單來講,該機制目的在於增強軟件系統的擴展性、方便與其他三方系統的集成、或者改變其系統原有 的默認行為。如下圖:
當系統中不存在 鈎子 (Hooks) 機制時,整個事件處理流程 從 事件 (Event) 的輸入,到 處理 (Handler), 再到完成后的返回結果 (Result) 對於系統外部而講,都是不可見、且無法修改的。
而在這個過程中加入一個可掛載函數的點 (HookPoint),允許外部插件掛載多個回調函數,形成一個調用鏈。達到對內部事件處理過程的擴展和修改。系統中常用到的認證插件則是按照該邏輯進行實現的。
因此,在 EMQ X 中,鈎子 (Hooks) 這種機制極大地方便了系統的擴展。我們不需要修改 emqx 核心代 碼,僅需要在特定的位置埋下掛載點 (HookPoint) ,便能允許外部插件擴展 EMQ X 的各種行為。
對於實現者來說僅需要關注:
- 掛載點 (HookPoint) 的位置:包括其作用、執行的時機、和如何掛載和取消掛載。
- 回調函數 的實現:包括回調函數的入參個數、作用、數據結構等,及返回值代表的含義。
- 了解回調函數在 鏈 上執行的機制:包括回調函數執行的順序,及如何提前終止鏈的執行。
配置項說明
配置文件:/etc/emqx/plugins/emqx_web_hook.conf
web.hook.url:Webhook 請求轉發的目的 Web 服務器地址。
web.hook.encoding_of_payload_field:PUBLISH 報文中 Payload 字段的編碼格式。
觸發規則
配置的格式如下:
## 格式示例
web.hook.rule.<Event>.<Number> = <Rule>
## 示例值
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}
event:目前支持以下事件
名稱 | 說明 | 執行時機 |
---|---|---|
client.connect | 處理連接報文 | 服務端收到客戶端的連接報文時 |
client.connack | 下發連接應答 | 服務端准備下發連接應答報文時 |
client.connected | 成功接入 | 客戶端認證完成並成功接入系統后 |
client.disconnected | 連接斷開 | 客戶端連接層在准備關閉時 |
client.subscribe | 訂閱主題 | 收到訂閱報文后,執行 client.check_acl 鑒權前 |
client.unsubscribe | 取消訂閱 | 收到取消訂閱報文后 |
session.subscribed | 會話訂閱主題 | 完成訂閱操作后 |
session.unsubscribed | 會話取消訂閱 | 完成取消訂閱操作后 |
message.publish | 消息發布 | 服務端在發布(路由)消息前 |
message.delivered | 消息投遞 | 消息准備投遞到客戶端前 |
message.acked | 消息回執 | 服務端在收到客戶端發回的消息 ACK 后 |
message.dropped | 消息丟棄 | 發布出的消息被丟棄后 |
number:同一個事件可以配置多個觸發規則,配置相同的事件應當依次遞增。
rule:觸發規則
其值為一個 JSON 字符串,其中可用的 Key 有:
- action:字符串,取固定值
- topic:字符串,表示一個主題過濾器,操作的主題只有與該主題匹配才能觸發事件的轉發
例如,我們只將與 a/b/c
和 foo/#
主題匹配的消息轉發到 Web 服務器上,其配置應該為:
web.hook.rule.message.publish.1 = {"action": "on_message_publish", "topic": "a/b/c"}
web.hook.rule.message.publish.2 = {"action": "on_message_publish", "topic": "foo/#"}
這樣 Webhook 僅會轉發與 a/b/c
和 foo/#
主題匹配的消息,例如 foo/bar
等,而不是轉發 a/b/d
或 fo/bar
WebHook事件參數
事件觸發時 Webhook 會按照配置將每個事件組成一個 HTTP 請求發送到 url
所配置的 Web 服務器上。其請求格式為:
URL: <url> # 來自於配置中的 `url` 字段
Method: POST # 固定為 POST 方法
Body: <JSON> # Body 為 JSON 格式字符串
對於不同的事件,請求 Body 體內容有所不同,下表列舉了各個事件中 Body 的參數列表:
client.connect
Key | 類型 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_connect" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
ipaddress | string | 客戶端源 IP 地址 |
keepalive | integer | 客戶端申請的心跳保活時間 |
proto_ver | integer | 協議版本號 |
client.connack
Key | 類型 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_connack" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
ipaddress | string | 客戶端源 IP 地址 |
keepalive | integer | 客戶端申請的心跳保活時間 |
proto_ver | integer | 協議版本號 |
conn_ack | string | "success" 表示成功,其它表示失敗的原因 |
client.connected
Key | 類型 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_connected" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
ipaddress | string | 客戶端源 IP 地址 |
keepalive | integer | 客戶端申請的心跳保活時間 |
proto_ver | integer | 協議版本號 |
connected_at | integer | 時間戳(秒) |
client.disconnected
Key | 類型 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_disconnected" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
reason | string | 錯誤原因 |
client.subscribe
Key | 類型 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_subscribe" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
topic | string | 將訂閱的主題 |
opts | json | 訂閱參數 |
opts 包含
Key | 類型 | 說明 |
---|---|---|
qos | enum | QoS 等級,可取 0 1 2 |
client.unsubscribe
Key | 類型 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"client_unsubscribe" |
clientid | string | 客戶端 ClientId |
username | string | 客戶端 Username,不存在時該值為 "undefined" |
topic | string | 取消訂閱的主題 |
session.subscribed:同 client.subscribe
,action 為 session_subscribed
session.unsubscribed:同 client.unsubscribe
,action 為 session_unsubscribe
session.terminated: 同 client.disconnected
,action 為 session_terminated
message.publish
Key | 類型 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"message_publish" |
from_client_id | string | 發布端 ClientId |
from_username | string | 發布端 Username,不存在時該值為 "undefined" |
topic | string | 取消訂閱的主題 |
qos | enum | QoS 等級,可取 0 1 2 |
retain | bool | 是否為 Retain 消息 |
payload | string | 消息 Payload |
ts | integer | 消息的時間戳(毫秒) |
message.delivered
Key | 類型 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"message_delivered" |
clientid | string | 接收端 ClientId |
username | string | 接收端 Username,不存在時該值為 "undefined" |
from_client_id | string | 發布端 ClientId |
from_username | string | 發布端 Username,不存在時該值為 "undefined" |
topic | string | 取消訂閱的主題 |
qos | enum | QoS 等級,可取 0 1 2 |
retain | bool | 是否為 Retain 消息 |
payload | string | 消息 Payload |
ts | integer | 消息時間戳(毫秒) |
message.acked
Key | 類型 | 說明 |
---|---|---|
action | string | 事件名稱 固定為:"message_acked" |
clientid | string | 接收端 ClientId |
from_client_id | string | 發布端 ClientId |
from_username | string | 發布端 Username,不存在時該值為 "undefined" |
topic | string | 取消訂閱的主題 |
qos | enum | QoS 等級,可取 0 1 2 |
retain | bool | 是否為 Retain 消息 |
payload | string | 消息 Payload |
ts | integer | 消息時間戳(毫秒) |
WebHook案例編寫
修改配置文件:
web.hook.url = http://127.0.0.1:8991/mqtt/webhook
web.hook.rule.client.connect.1 = {"action": "on_client_connect"}
web.hook.rule.client.connack.1 = {"action": "on_client_connack"}
web.hook.rule.client.connected.1 = {"action": "on_client_connected"}
web.hook.rule.client.disconnected.1 = {"action": "on_client_disconnected"}
web.hook.rule.client.subscribe.1 = {"action": "on_client_subscribe"}
web.hook.rule.client.unsubscribe.1 = {"action": "on_client_unsubscribe"}
web.hook.rule.session.subscribed.1 = {"action": "on_session_subscribed"}
web.hook.rule.session.unsubscribed.1 = {"action": "on_session_unsubscribed"}
web.hook.rule.session.terminated.1 = {"action": "on_session_terminated"}
web.hook.rule.message.publish.1 = {"action": "on_message_publish"}
web.hook.rule.message.delivered.1 = {"action": "on_message_delivered"}
web.hook.rule.message.acked.1 = {"action": "on_message_acked"}
啟動webhook插件:
重啟emqx:
emqx restart
暴露方法:
@RestController
@RequestMapping("/mqtt")
public class WebHookController {
private static final Logger logger = LoggerFactory.getLogger(WebHookController.class);
private final Map<String,Boolean> clientStatusMap = new HashMap<>();
@PostMapping("/webhook")
public void webhook(@RequestBody Map<String, Object> params){
logger.info("emqx 觸發 webhook,請求體數據={}",params);
String action = (String) params.get("action");
String clientId = (String) params.get("clientid");
if(action.equals("client_connected")){
//客戶端成功接入
clientStatusMap.put(clientId,true);
}
if(action.equals("client_disconnected")){
//客戶端斷開連接
clientStatusMap.put(clientId,false);
}
}
@GetMapping("/getall")
public Map<String,Boolean> getAllStatus(){
return clientStatusMap;
}
}
打包上傳到服務器中,並運行。
測試WebHook
使用客戶端工具,連接,訂閱主題和發送消息,觀察控制台輸出。