EMQ X 插件持久化系列 (五)MySQL MQTT 數據存儲


本文以在 CentOS 7.2 中的實際例子來說明如何通過 MySQL 來存儲相關的 MQTT 數據。

MySQL 屬於傳統的關系型數據庫產品,其開放式的架構使得用戶的選擇性很強,而且隨着技術的逐漸成熟,MySQL 支持的功能也越來越多,性能也在不斷地提高,對平台的支持也在增多,此外,社區的開發與維護人數也很多。當下,MySQL 因為其功能穩定、性能卓越,且在遵守 GPL 協議的前提下,可以免費使用與修改,因此深受用戶喜愛。

安裝與驗證 MySQL 服務器

讀者可以參考 MySQL 官方文檔 或使用 Docker 來下載安裝 MySQL 服務器,本文章使用 MySQL 5.6 版本。

為方便管理操作,可下載使用官方免費圖形化管理軟件 MySQL Workbeanch

如果讀者使用的是 MySQL 8.0 及以上版本,MySQL 需按照 EMQ X 無法連接 MySQL 8.0 教程特殊配置。

准備

初始化數據表

插件運行依賴以下幾張數據表,數據表需要用戶自行創建,表結構不可改動。

mqtt_client 存儲設備在線狀態

DROP TABLE IF EXISTS `mqtt_client`;
CREATE TABLE `mqtt_client` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(64) DEFAULT NULL,
  `state` varchar(3) DEFAULT NULL, -- 在線狀態 0 離線 1 在線
  `node` varchar(100) DEFAULT NULL, -- 所屬節點
  `online_at` datetime DEFAULT NULL, -- 上線時間
  `offline_at` datetime DEFAULT NULL, -- 下線時間
  `created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `mqtt_client_idx` (`clientid`),
  UNIQUE KEY `mqtt_client_key` (`clientid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

mqtt_sub 存儲設備的主題訂閱關系

DROP TABLE IF EXISTS `mqtt_sub`;
CREATE TABLE `mqtt_sub` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(64) DEFAULT NULL,
  `topic` varchar(255) DEFAULT NULL,
  `qos` int(3) DEFAULT NULL,
  `created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  KEY `mqtt_sub_idx` (`clientid`,`topic`(255),`qos`),
  UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

mqtt_msg 存儲 MQTT 消息

DROP TABLE IF EXISTS `mqtt_msg`;
CREATE TABLE `mqtt_msg` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `msgid` varchar(100) DEFAULT NULL,
  `topic` varchar(1024) NOT NULL,
  `sender` varchar(1024) DEFAULT NULL,
  `node` varchar(60) DEFAULT NULL,
  `qos` int(11) NOT NULL DEFAULT '0',
  `retain` tinyint(2) DEFAULT NULL,
  `payload` blob,
  `arrived` datetime NOT NULL, -- 是否抵達(QoS > 0)
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

mqtt_retain 存儲 Retain 消息

DROP TABLE IF EXISTS `mqtt_retain`;
CREATE TABLE `mqtt_retain` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `topic` varchar(200) DEFAULT NULL,
  `msgid` varchar(60) DEFAULT NULL,
  `sender` varchar(100) DEFAULT NULL,
  `node` varchar(100) DEFAULT NULL,
  `qos` int(2) DEFAULT NULL,
  `payload` blob,
  `arrived` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_retain_key` (`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

mqtt_acked 存儲客戶端消息確認

DROP TABLE IF EXISTS `mqtt_acked`;
CREATE TABLE `mqtt_acked` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `clientid` varchar(200) DEFAULT NULL,
  `topic` varchar(200) DEFAULT NULL,
  `mid` int(200) DEFAULT NULL,
  `created` timestamp NULL DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `mqtt_acked_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

配置 EMQ X 服務器

通過 RPM 方式安裝的 EMQ X,MySQL 相關的配置文件位於 /etc/emqx/plugins/emqx_backend_mysql.conf,本文僅測試 MySQL 持久化的功能,大部分配置不需要做更改。填入用戶名、密碼、數據庫即可:

auth.mysql.server = 127.0.0.1:3306

auth.mysql.username = root

auth.mysql.password = 123456

auth.mysql.database = mqtt

保持剩下部分的配置文件不變,然后需要啟動該插件。啟動插件的方式有 命令行控制台REST API 三種方式,讀者可以任選其一。

通過命令行啟動

emqx_ctl plugins load emqx_backend_mysql

通過管理控制台啟動

EMQ X 管理控制台 插件 頁面中,找到 emqx_backend_mysql 插件,點擊 啟動

通過 REST API 啟動

使用 PUT /api/v4/nodes/:node/plugins/:plugin_name/load API 可以啟動插件。

客戶端在線狀態存儲

客戶端上下線時,插件將更新在線狀態、上下線時間、節點客戶端列表至 MySQL 數據庫。

配置項

打開配置文件,配置 Backend 規則:

## hook: client.connected、client.disconnected
## action/function: on_client_connected、on_client_disconnected


## 客戶端上下線
backend.mysql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}

backend.mysql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

使用示例

瀏覽器打開 http://127.0.0.1:18083 EMQ X 管理控制台,在 工具 -> Websocket 中新建一個客戶端連接,指定 clientid 為 sub_client,點擊連接,連接成功后手動斷開:

image20181116105333637.png

在 MySQL Workbeanch 中點擊 mqtt_client 表查看,此時將寫入 / 更新一條客戶端上下線記錄:

image20181119105034528.png

客戶端代理訂閱

客戶端上線時,存儲模塊直接從數據庫讀取預設待訂閱列表,代理加載訂閱主題。在客戶端需要通過預定主題通信(接收消息)場景下,應用能從數據層面設定 / 改變代理訂閱列表。

配置項

打開配置文件,配置 Backend 規則:

## hook: client.connected
## action/function: on_subscribe_lookup
backend.mysql.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

使用示例

sub_client 設備上線時,需要為其訂閱 sub_client/upstreamsub_client/downlink 兩個 QoS 1 的主題:

  1. mqtt_sub 表中初始化插入代理訂閱主題信息:
insert into mqtt_sub(clientid, topic, qos) values("sub_client", "sub_client/upstream", 1);
insert into mqtt_sub(clientid, topic, qos) values("sub_client", "sub_client/downlink", 1);
  1. EMQ X 管理控制台 WebSocket 頁面,以 clientid sub_client 新建一個客戶端連接,切換至訂閱頁面,可見當前客戶端自動訂閱了 sub_client/upstreamsub_client/downlink 兩個 QoS 1 的主題:

image20181116110036523.png

  1. 切換回管理控制台 WebSocket 頁面,向 sub_client/downlink 主題發布消息,可在消息訂閱列表收到發布的消息。

持久化發布消息

配置項

打開配置文件,配置 Backend 規則,支持使用 topic 參數進行消息過濾,此處使用 # 通配符存儲任意主題消息:

## hook: message.publish
## action/function: on_message_publish

backend.mysql.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

使用示例

在 EMQ X 管理控制台 WebSocket 頁面中,向主題 upstream_topic 發布多條消息,EMQ X 將消息列表持久化至 mqtt_msg 表中:

image20181119110712267.png

暫只支持 QoS 1 2 的消息持久化。

Retain 消息持久化

配置項

打開配置文件,配置 Backend 規則:

## 同時開啟以下規則,啟用 retain 持久化三個生命周期

## 發布非空 retain 消息時 (存儲)
backend.mysql.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

## 設備訂閱主題時查詢 retain 消息
backend.mysql.hook.session.subscribed.2  = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}

## 發布空 retain 消息時 (清除)
backend.mysql.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

使用示例

在 EMQ X 管理控制台 WebSocket 頁面中建立連接后,發布消息勾選保留

image20181119111926675.png

發布(消息不為空)

非空的 retain 消息發布時,EMQ X 將以 topic 為唯一鍵,持久化該條消息至 mqtt_retain 表中,相同主題下發不同的 retain 消息,只有最后一條消息會被持久化:

image20181119164153931.png

訂閱

客戶端訂閱 retain 主題后,EMQ X 將查詢 mqtt_retain 數據表,執行投遞 retain 消息操作。

發布(消息為空)

MQTT 協議中,發布空的 retain 消息將清空 retain 記錄,此時 retain 記錄將從 mqtt_retain 表中刪除。

消息確認持久化

開啟消息確認 (ACK) 持久化后,客戶端訂閱 QoS 1、QoS 2 級別的主題時,EMQ X 將在數據庫以 clientid + topic 為唯一鍵初始化 ACK 記錄。

配置項

打開配置文件,配置 Backend 規則,可使用 topic 通配符 過濾要應用的消息:

## 訂閱時初始化 ACK 記錄
backend.mysql.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}


## 消息抵達時更新抵達狀態
backend.mysql.hook.message.acked.1       = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}

## 取消訂閱時刪除記錄行
backend.mysql.hook.session.unsubscribed.1= {"topic": "#", "action": {"sql": ["delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}

使用示例

在 EMQ X 管理控制台 WebSocket 頁面中建立連接后,訂閱 QoS > 0 的主題:

image20181119140251843.png

此時 mqtt_acked 表將插入初始化數據行,每向主題發布一條 QoS > 0 的消息,消息抵達后數據行 mid 將自增 1:

image20181119140354855.png

代理訂閱中滿足 QoS > 0 的 topic 也會初始化記錄,客戶端取消訂閱后相關記錄將被刪除。

自定義 SQL

除去插件內置函數、表結構外,emqx_backend_mysql 還支持自定義 SQL 語句,通過使用如 ${clientid} 模板語法動態構造 SQL 語句實現如客戶端連接歷史、更新自定義數據表等操作。

SQL語句參數說明

hook 可用參數 示例(sql語句中${name} 表示可獲取的參數)
client.connected clientid insert into conn(clientid) values(${clientid})
client.disconnected clientid insert into disconn(clientid) values(${clientid})
session.subscribed clientid, topic, qos insert into sub(topic, qos) values(${topic}, ${qos})
session.unsubscribed clientid, topic delete from sub where topic = ${topic}
message.publish msgid, topic, payload, qos, clientid insert into msg(msgid, topic) values(${msgid}, ${topic})
message.acked msgid, topic, clientid insert into ack(msgid, topic) values(${msgid}, ${topic})
message.delivered msgid, topic, clientid insert into delivered(msgid, topic) values(${msgid}, ${topic})

客戶端連接 log 示例

設計表結構如下:

CREATE TABLE `mqtt`.`connect_logs` (
  `id` INT NOT NULL AUTO_INCREMENT,
  `clientid` VARCHAR(255) NULL,
  `created_at` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, -- 記錄時間
  `state` INT NOT NULL DEFAULT 0,  -- 記錄類型: 0 下線 1 上線
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

自定義 SQL:

## connected hook 中配置自定義 SQL
## 可以配置多條 SQL 語句 "SQL": ["sql_a", "sql_b", "sql_c"]

## 連接時
backend.mysql.hook.client.connected.3 = {"action": {"sql": ["insert into connect_logs(clientid, state) values(${clientid}, 1)"]}, "pool": "pool1"}

## 斷開時
backend.mysql.hook.client.disconnected.3 = {"action": {"sql": ["insert into connect_logs(clientid, state) values(${clientid}, 0)"]}, "pool": "pool1"}

客戶端上下線時將填充並執行預定的 SQL 語句,將連接記錄寫入 connect_logs 表。

image20181119154828728.png

高級選項

backend.mysql.time_range = 5s

backend.mysql.max_returned_count = 500

總結

讀者在理解了 MySQL 中所存儲的數據結構、自定義 SQL 之后,可以結合 MySQL 拓展相關應用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM