本文以在 CentOS 7.2 中的實際例子來說明如何通過 MySQL 來存儲相關的 MQTT 數據。
MySQL 屬於傳統的關系型數據庫產品,其開放式的架構使得用戶的選擇性很強,而且隨着技術的逐漸成熟,MySQL 支持的功能也越來越多,性能也在不斷地提高,對平台的支持也在增多,此外,社區的開發與維護人數也很多。當下,MySQL 因為其功能穩定、性能卓越,且在遵守 GPL 協議的前提下,可以免費使用與修改,因此深受用戶喜愛。
安裝與驗證 MySQL 服務器
讀者可以參考 MySQL 官方文檔 或使用 Docker 來下載安裝 MySQL 服務器,本文章使用 MySQL 5.6 版本。
為方便管理操作,可下載使用官方免費圖形化管理軟件 MySQL Workbeanch。
如果讀者使用的是 MySQL 8.0 及以上版本,MySQL 需按照 EMQ X 無法連接 MySQL 8.0 教程特殊配置。
准備
初始化數據表
插件運行依賴以下幾張數據表,數據表需要用戶自行創建,表結構不可改動。
mqtt_client 存儲設備在線狀態
DROP TABLE IF EXISTS `mqtt_client`;
CREATE TABLE `mqtt_client` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(64) DEFAULT NULL,
`state` varchar(3) DEFAULT NULL, -- 在線狀態 0 離線 1 在線
`node` varchar(100) DEFAULT NULL, -- 所屬節點
`online_at` datetime DEFAULT NULL, -- 上線時間
`offline_at` datetime DEFAULT NULL, -- 下線時間
`created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `mqtt_client_idx` (`clientid`),
UNIQUE KEY `mqtt_client_key` (`clientid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
mqtt_sub 存儲設備的主題訂閱關系
DROP TABLE IF EXISTS `mqtt_sub`;
CREATE TABLE `mqtt_sub` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(64) DEFAULT NULL,
`topic` varchar(255) DEFAULT NULL,
`qos` int(3) DEFAULT NULL,
`created` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
KEY `mqtt_sub_idx` (`clientid`,`topic`(255),`qos`),
UNIQUE KEY `mqtt_sub_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
mqtt_msg 存儲 MQTT 消息
DROP TABLE IF EXISTS `mqtt_msg`;
CREATE TABLE `mqtt_msg` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`msgid` varchar(100) DEFAULT NULL,
`topic` varchar(1024) NOT NULL,
`sender` varchar(1024) DEFAULT NULL,
`node` varchar(60) DEFAULT NULL,
`qos` int(11) NOT NULL DEFAULT '0',
`retain` tinyint(2) DEFAULT NULL,
`payload` blob,
`arrived` datetime NOT NULL, -- 是否抵達(QoS > 0)
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
mqtt_retain 存儲 Retain 消息
DROP TABLE IF EXISTS `mqtt_retain`;
CREATE TABLE `mqtt_retain` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`topic` varchar(200) DEFAULT NULL,
`msgid` varchar(60) DEFAULT NULL,
`sender` varchar(100) DEFAULT NULL,
`node` varchar(100) DEFAULT NULL,
`qos` int(2) DEFAULT NULL,
`payload` blob,
`arrived` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_retain_key` (`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
mqtt_acked 存儲客戶端消息確認
DROP TABLE IF EXISTS `mqtt_acked`;
CREATE TABLE `mqtt_acked` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`clientid` varchar(200) DEFAULT NULL,
`topic` varchar(200) DEFAULT NULL,
`mid` int(200) DEFAULT NULL,
`created` timestamp NULL DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `mqtt_acked_key` (`clientid`,`topic`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
配置 EMQ X 服務器
通過 RPM 方式安裝的 EMQ X,MySQL 相關的配置文件位於 /etc/emqx/plugins/emqx_backend_mysql.conf,本文僅測試 MySQL 持久化的功能,大部分配置不需要做更改。填入用戶名、密碼、數據庫即可:
auth.mysql.server = 127.0.0.1:3306
auth.mysql.username = root
auth.mysql.password = 123456
auth.mysql.database = mqtt
保持剩下部分的配置文件不變,然后需要啟動該插件。啟動插件的方式有 命令行、 控制台 和 REST API 三種方式,讀者可以任選其一。
通過命令行啟動
emqx_ctl plugins load emqx_backend_mysql
通過管理控制台啟動
EMQ X 管理控制台 插件 頁面中,找到 emqx_backend_mysql 插件,點擊 啟動。
通過 REST API 啟動
使用 PUT /api/v4/nodes/:node/plugins/:plugin_name/load API 可以啟動插件。
客戶端在線狀態存儲
客戶端上下線時,插件將更新在線狀態、上下線時間、節點客戶端列表至 MySQL 數據庫。
配置項
打開配置文件,配置 Backend 規則:
## hook: client.connected、client.disconnected
## action/function: on_client_connected、on_client_disconnected
## 客戶端上下線
backend.mysql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
backend.mysql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
使用示例
瀏覽器打開 http://127.0.0.1:18083 EMQ X 管理控制台,在 工具 -> Websocket 中新建一個客戶端連接,指定 clientid 為 sub_client,點擊連接,連接成功后手動斷開:

在 MySQL Workbeanch 中點擊 mqtt_client 表查看,此時將寫入 / 更新一條客戶端上下線記錄:

客戶端代理訂閱
客戶端上線時,存儲模塊直接從數據庫讀取預設待訂閱列表,代理加載訂閱主題。在客戶端需要通過預定主題通信(接收消息)場景下,應用能從數據層面設定 / 改變代理訂閱列表。
配置項
打開配置文件,配置 Backend 規則:
## hook: client.connected
## action/function: on_subscribe_lookup
backend.mysql.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
使用示例
當 sub_client 設備上線時,需要為其訂閱 sub_client/upstream 與 sub_client/downlink 兩個 QoS 1 的主題:
- 在
mqtt_sub表中初始化插入代理訂閱主題信息:
insert into mqtt_sub(clientid, topic, qos) values("sub_client", "sub_client/upstream", 1);
insert into mqtt_sub(clientid, topic, qos) values("sub_client", "sub_client/downlink", 1);
- EMQ X 管理控制台 WebSocket 頁面,以 clientid
sub_client新建一個客戶端連接,切換至訂閱頁面,可見當前客戶端自動訂閱了sub_client/upstream與sub_client/downlink兩個 QoS 1 的主題:

- 切換回管理控制台 WebSocket 頁面,向
sub_client/downlink主題發布消息,可在消息訂閱列表收到發布的消息。
持久化發布消息
配置項
打開配置文件,配置 Backend 規則,支持使用 topic 參數進行消息過濾,此處使用 # 通配符存儲任意主題消息:
## hook: message.publish
## action/function: on_message_publish
backend.mysql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
使用示例
在 EMQ X 管理控制台 WebSocket 頁面中,向主題 upstream_topic 發布多條消息,EMQ X 將消息列表持久化至 mqtt_msg 表中:

暫只支持 QoS 1 2 的消息持久化。
Retain 消息持久化
配置項
打開配置文件,配置 Backend 規則:
## 同時開啟以下規則,啟用 retain 持久化三個生命周期
## 發布非空 retain 消息時 (存儲)
backend.mysql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## 設備訂閱主題時查詢 retain 消息
backend.mysql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## 發布空 retain 消息時 (清除)
backend.mysql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
使用示例
在 EMQ X 管理控制台 WebSocket 頁面中建立連接后,發布消息勾選保留:

發布(消息不為空)
非空的 retain 消息發布時,EMQ X 將以 topic 為唯一鍵,持久化該條消息至 mqtt_retain 表中,相同主題下發不同的 retain 消息,只有最后一條消息會被持久化:

訂閱
客戶端訂閱 retain 主題后,EMQ X 將查詢 mqtt_retain 數據表,執行投遞 retain 消息操作。
發布(消息為空)
MQTT 協議中,發布空的 retain 消息將清空 retain 記錄,此時 retain 記錄將從 mqtt_retain 表中刪除。
消息確認持久化
開啟消息確認 (ACK) 持久化后,客戶端訂閱 QoS 1、QoS 2 級別的主題時,EMQ X 將在數據庫以 clientid + topic 為唯一鍵初始化 ACK 記錄。
配置項
打開配置文件,配置 Backend 規則,可使用 topic 通配符 過濾要應用的消息:
## 訂閱時初始化 ACK 記錄
backend.mysql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## 消息抵達時更新抵達狀態
backend.mysql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## 取消訂閱時刪除記錄行
backend.mysql.hook.session.unsubscribed.1= {"topic": "#", "action": {"sql": ["delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
使用示例
在 EMQ X 管理控制台 WebSocket 頁面中建立連接后,訂閱 QoS > 0 的主題:

此時 mqtt_acked 表將插入初始化數據行,每向主題發布一條 QoS > 0 的消息,消息抵達后數據行 mid 將自增 1:

代理訂閱中滿足 QoS > 0 的 topic 也會初始化記錄,客戶端取消訂閱后相關記錄將被刪除。
自定義 SQL
除去插件內置函數、表結構外,emqx_backend_mysql 還支持自定義 SQL 語句,通過使用如 ${clientid} 模板語法動態構造 SQL 語句實現如客戶端連接歷史、更新自定義數據表等操作。
SQL語句參數說明
| hook | 可用參數 | 示例(sql語句中${name} 表示可獲取的參數) |
|---|---|---|
| client.connected | clientid | insert into conn(clientid) values(${clientid}) |
| client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
| session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(${topic}, ${qos}) |
| session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
| message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
| message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
| message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(${msgid}, ${topic}) |
客戶端連接 log 示例
設計表結構如下:
CREATE TABLE `mqtt`.`connect_logs` (
`id` INT NOT NULL AUTO_INCREMENT,
`clientid` VARCHAR(255) NULL,
`created_at` TIMESTAMP NULL DEFAULT CURRENT_TIMESTAMP, -- 記錄時間
`state` INT NOT NULL DEFAULT 0, -- 記錄類型: 0 下線 1 上線
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
自定義 SQL:
## connected hook 中配置自定義 SQL
## 可以配置多條 SQL 語句 "SQL": ["sql_a", "sql_b", "sql_c"]
## 連接時
backend.mysql.hook.client.connected.3 = {"action": {"sql": ["insert into connect_logs(clientid, state) values(${clientid}, 1)"]}, "pool": "pool1"}
## 斷開時
backend.mysql.hook.client.disconnected.3 = {"action": {"sql": ["insert into connect_logs(clientid, state) values(${clientid}, 0)"]}, "pool": "pool1"}
客戶端上下線時將填充並執行預定的 SQL 語句,將連接記錄寫入 connect_logs 表。

高級選項
backend.mysql.time_range = 5s
backend.mysql.max_returned_count = 500
總結
讀者在理解了 MySQL 中所存儲的數據結構、自定義 SQL 之后,可以結合 MySQL 拓展相關應用。
