從 v4.1 版本開始,EMQ X MQTT 服務器 提供了專門的多語言支持插件 emqx_extension_hook ,現已支持使用其他編程語言來處理 EMQ X 中的鈎子事件,開發者可以使用 Python 或者 Java 快速開發自己的插件,在官方功能的基礎上進行擴展,滿足自己的業務場景。例如:
- 驗證某客戶端的登錄權限:客戶端連接時觸發對應函數,通過參數獲取客戶端信息后通過讀取數據庫、比對等操作判定是否有登錄權限
- 記錄客戶端在線狀態與上下線歷史:客戶端狀態變動時觸發對應函數,通過參數獲取客戶端信息,改寫數據庫中客戶端在線狀態
- 校驗某客戶端的 PUB/SUB 的操作權限:發布/訂閱時觸發對應函數,通過參數獲取客戶端信息與當前主題,判定客戶端是否有對應的操作權限
- 處理會話 (Sessions) 和 消息 (Message) 事件,實現訂閱關系與消息處理/存儲:消息發布、狀態變動時觸發對應函數,獲取當前客戶端信息、消息狀態與消息內容,轉發到 Kafka 或數據庫進行存儲。
注:消息(Message) 類鈎子,僅在企業版中支持。
Python 和 Java 驅動基於 Erlang/OTP-Port 進程間通信實現,本身具有非常高的吞吐性能,本文以 Java 拓展為例介紹 EMQ X 跨語言拓展使用方式。
Java 拓展使用示例
要求
- EMQ X 所在服務器需安裝 JDK 1.8 以上版本
開始使用
- 創建 Java 項目
- 下載 io.emqx.extension.jar 和 erlport.jar 文件
- 添加SDK
io.emqx.extension.jar
和erlport.jar
到項目依賴 - 復制
examples/SampleHandler.java
到您的項目中 - 根據 SDK
SampleHandler.java
中的示例編寫業務代碼,確保能夠成功編譯
部署
編譯所有源代碼后,需要將 sdk
和代碼文件部署到 EMQ X 中:
- 復制
io.emqx.extension.jar
到emqx/data/extension
目錄 - 將編譯后的
.class
文件,例如SampleHandler.class
復制到emqx/data/extension
目錄 - 修改
emqx/etc/plugins/emqx_extension_hook.conf
配置文件:
exhook.drivers = java
## Search path for scripts or library
exhook.drivers.java.path = data/extension/
exhook.drivers.java.init_module = SampleHandler
啟動 emqx_extension_hook
插件,如果配置錯誤或 Java 代碼編寫錯誤將無法正常啟動。啟動后嘗試建立 MQTT 連接並觀察業務運行情況。
示例
以下為 SampleHandler.java 示例程序, 該程序繼承自 SDK 中的 DefaultCommunicationHandler
類。該示例代碼演示了如何掛載 EMQ X 系統中所有的鈎子:
import emqx.extension.java.handler.*;
import emqx.extension.java.handler.codec.*;
import emqx.extension.java.handler.ActionOptionConfig.Keys;
public class SampleHandler extends DefaultCommunicationHandler {
@Override
public ActionOptionConfig getActionOption() {
ActionOptionConfig option = new ActionOptionConfig();
option.set(Keys.MESSAGE_PUBLISH_TOPICS, "#");
option.set(Keys.MESSAGE_DELIVERED_TOPICS, "#");
option.set(Keys.MESSAGE_ACKED_TOPICS, "#");
option.set(Keys.MESSAGE_DROPPED_TOPICS, "#");
return option;
}
// Clients
@Override
public void onClientConnect(ConnInfo connInfo, Property[] props) {
System.err.printf("[Java] onClientConnect: connInfo: %s, props: %s\n", connInfo, props);
}
@Override
public void onClientConnack(ConnInfo connInfo, ReturnCode rc, Property[] props) {
System.err.printf("[Java] onClientConnack: connInfo: %s, rc: %s, props: %s\n", connInfo, rc, props);
}
@Override
public void onClientConnected(ClientInfo clientInfo) {
System.err.printf("[Java] onClientConnected: clientinfo: %s\n", clientInfo);
}
@Override
public void onClientDisconnected(ClientInfo clientInfo, Reason reason) {
System.err.printf("[Java] onClientDisconnected: clientinfo: %s, reason: %s\n", clientInfo, reason);
}
// 判定認證結果,返回 true 或 false
@Override
public boolean onClientAuthenticate(ClientInfo clientInfo, boolean authresult) {
System.err.printf("[Java] onClientAuthenticate: clientinfo: %s, authresult: %s\n", clientInfo, authresult);
return true;
}
// 判定 ACL 檢查結果,返回 true 或 false
@Override
public boolean onClientCheckAcl(ClientInfo clientInfo, PubSub pubsub, Topic topic, boolean result) {
System.err.printf("[Java] onClientCheckAcl: clientinfo: %s, pubsub: %s, topic: %s, result: %s\n", clientInfo, pubsub, topic, result);
return true;
}
@Override
public void onClientSubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
System.err.printf("[Java] onClientSubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);
}
@Override
public void onClientUnsubscribe(ClientInfo clientInfo, Property[] props, TopicFilter[] topic) {
System.err.printf("[Java] onClientUnsubscribe: clientinfo: %s, topic: %s, props: %s\n", clientInfo, topic, props);
}
// Sessions
@Override
public void onSessionCreated(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionCreated: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionSubscribed(ClientInfo clientInfo, Topic topic, SubscribeOption opts) {
System.err.printf("[Java] onSessionSubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);
}
@Override
public void onSessionUnsubscribed(ClientInfo clientInfo, Topic topic) {
System.err.printf("[Java] onSessionUnsubscribed: clientinfo: %s, topic: %s\n", clientInfo, topic);
}
@Override
public void onSessionResumed(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionResumed: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionDiscarded(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionDiscarded: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionTakeovered(ClientInfo clientInfo) {
System.err.printf("[Java] onSessionTakeovered: clientinfo: %s\n", clientInfo);
}
@Override
public void onSessionTerminated(ClientInfo clientInfo, Reason reason) {
System.err.printf("[Java] onSessionTerminated: clientinfo: %s, reason: %s\n", clientInfo, reason);
}
// Messages
@Override
public Message onMessagePublish(Message message) {
System.err.printf("[Java] onMessagePublish: message: %s\n", message);
return message;
}
@Override
public void onMessageDropped(Message message, Reason reason) {
System.err.printf("[Java] onMessageDropped: message: %s, reason: %s\n", message, reason);
}
@Override
public void onMessageDelivered(ClientInfo clientInfo, Message message) {
System.err.printf("[Java] onMessageDelivered: clientinfo: %s, message: %s\n", clientInfo, message);
}
@Override
public void onMessageAcked(ClientInfo clientInfo, Message message) {
System.err.printf("[Java] onMessageAcked: clientinfo: %s, message: %s\n", clientInfo, message);
}
}
SampleHandler
主要包含兩部分:
-
重載了
getActionOption
方法。該方法對消息(Message)相關的鈎子進行配置,指定了需要生效的主題列表。配置項 對應鈎子 MESSAGE_PUBLISH_TOPICS message_publish MESSAGE_DELIVERED_TOPICS message_delivered MESSAGE_ACKED_TOPICS message_acked MESSAGE_DROPPED_TOPICS message_dropped -
重載了
on<hookName>
方法,這些方法是實際處理鈎子事件的回調函數,函數命名方式為各個鈎子名稱變體后前面加on
前綴,變體方式為鈎子名稱去掉下划線后使用駱駝拼寫法(CamelCase),例如,鈎子client_connect對應的函數名為onClientConnect。 EMQ X 客戶端產生的事件,例如:連接、發布、訂閱等,都會最終分發到這些鈎子事件回調函數上,然后回調函數可對各屬性及狀態進行相關操作。 示例程序中僅對各參數進行了打印輸出。如果只關心部分鈎子事件,只需對這部分鈎子事件的回調函數進行重載即可,不需要重載所有回調函數。
各回調函數的執行時機和支持的鈎子列表與 EMQ X 內置的鈎子完全一致,參見:Hooks - EMQ X
在實現自己的擴展程序時,最簡單的方式也是繼承 DefaultCommunicationHandler
父類,該類對各鈎子與回調函數的綁定進行了封裝,並進一步封裝了回調函數涉及到的參數數據結構,以方便快速上手使用。
進階開發
如果對 Java 擴展程序的可控性要求更高,DefaultCommunicationHandler
類已無法滿足需求時,可以通過實現 CommunicationHandler
接口,從更底層控制代碼邏輯,編寫更靈活的擴展程序。
package emqx.extension.java.handler;
public interface CommunicationHandler {
public Object init();
public void deinit();
}
init()
方法:用於初始化,聲明擴展需要掛載哪些鈎子,以及掛載的配置deinit()
方法:用於注銷。
詳細數據格式說明,參見 設計文檔。
版權聲明: 本文為 EMQ 原創,轉載請注明出處。
原文鏈接:https://www.emqx.io/cn/blog/develop-emqx-plugin-using-java