本章節以在 CentOS 7.2
中的實際例子來說明如何通過 PostgreSQL 來存儲相關的信息。
作為開源關系數據庫重要一員,PostgreSQL 標榜自己是世界上最先進的開源數據庫,相比於其他開源關系數據庫如 MySQL,PostgreSQL 是完全由社區驅動的開源項目,由全世界超過 1000 名貢獻者所維護。PostgreSQL 提供了單個完整功能的版本,而不像 MySQL 那樣提供了多個不同的社區版、商業版與企業版。PostgreSQL 基於自由的 BSD/MIT 許可,組織可以使用、復制、修改和重新分發代碼,只需要提供一個版權聲明即可。
PostgreSQL 具有諸多特性,在 GIS 領域有較多支持,其“無鎖定”特性非常突出,支持函數和條件索引,有成熟的集群方案。PostgreSQL 還具備及其強悍的 SQL 編程能力如統計函數和統計語法支持,通過 Timescaledb 插件,PostgreSQL 可以轉變為功能完備的時序數據庫 Timescaledb 。
功能概覽
- 客戶端在線狀態存儲
- 客戶端代理訂閱
- 持久化發布消息
- Retain 消息持久化
- 消息確認持久化
- 自定義 SQL
安裝與驗證 PostgreSQL 服務器
讀者可以參考 PostgreSQL 官方文檔 或 Docker 來下載安裝PostgreSQL 服務器,本文章使用 PostgreSQL 10.1 版本。
為方便管理操作,可下載使用免費圖形化管理軟件 Postico(僅限 MacOS)或 pgAdmin。
配置 EMQ X 服務器
通過 RPM 方式安裝的 EMQ X,PostgreSQL 相關的配置文件位於 /etc/emqx/plugins/emqx_backend_pgsql.conf
,如果只是測試 PostgreSQL 持久化的功能,大部分配置不需要做更改,填入用戶名、密碼、數據庫即可:
backend.pgsql.pool1.server = 127.0.0.1:5432
backend.pgsql.pool1.pool_size = 8
backend.pgsql.pool1.username = root
backend.pgsql.pool1.password = public
backend.pgsql.pool1.database = mqtt
backend.pgsql.pool1.ssl = false
保持剩下部分的配置文件不變,然后需要啟動該插件。啟動插件的方式有 命令行
和 控制台
兩種方式,讀者可以任選其一。
通過命令行啟動
emqx_ctl plugins load emqx_backend_pgsql
通過管理控制台啟動
EMQ X 管理控制台 插件 頁面中,找到 emqx_backend_pgsql 插件,點擊 啟動。
客戶端在線狀態存儲
客戶端上下線時,插件將更新在線狀態、上下線時間、節點客戶端列表至 PostgreSQL 數據庫。
數據表
創建 mqtt_client 設備在線狀態表:
CREATE TABLE mqtt_client(
id SERIAL primary key,
clientid character varying(100),
state integer, -- 在線狀態: 0 離線 1 在線
node character varying(100), -- 接入節點名稱
online_at timestamp, -- 上線時間
offline_at timestamp, -- 下線時間
created timestamp without time zone,
UNIQUE (clientid)
);
配置項
打開配置文件,配置 Backend 規則:
## hook: client.connected、client.disconnected
## action/function: on_client_connected、on_client_disconnected
## 客戶端上下線
backend.pgsql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}
## 客戶端下線
backend.pgsql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}
使用示例
瀏覽器打開 http://127.0.0.1:18083
EMQ X 管理控制台,在 工具 -> Websocket 中新建一個客戶端連接,指定 clientid 為 sub_client,點擊連接,連接成功后手動斷開:
查看 mqtt_client
表,此時將寫入 / 更新一條客戶端上下線記錄:
客戶端代理訂閱
客戶端上線時,存儲模塊直接從數據庫讀取預設待訂閱列表,代理加載訂閱主題。在客戶端需要通過預定主題通信(接收消息)場景下,應用能從數據層面設定 / 改變代理訂閱列表。
數據表
創建 mqtt_sub 代理訂閱關系表:
CREATE TABLE mqtt_sub(
id SERIAL primary key,
clientid character varying(100),
topic character varying(200), -- topic
qos integer, -- QoS
created timestamp without time zone,
UNIQUE (clientid, topic)
);
配置項
打開配置文件,配置 Backend 規則:
## hook: client.connected
## action/function: on_subscribe_lookup
backend.pgsql.hook.client.connected.2 = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}
使用示例
當 sub_client
設備上線時,需要為其訂閱 sub_client/upstream
與 sub_client/downlink
兩個 QoS 1 的主題:
- 在
mqtt_sub
表中初始化插入代理訂閱主題信息:
insert into mqtt_sub(clientid, topic, qos) values('sub_client', 'sub_client/upstream', 1);
insert into mqtt_sub(clientid, topic, qos) values('sub_client', 'sub_client/downlink', 1);
- EMQ X 管理控制台 WebSocket 頁面,以 clientid
sub_client
新建一個客戶端連接,切換至訂閱頁面,可見當前客戶端自動訂閱了sub_client/upstream
與sub_client/downlink
兩個 QoS 1 的主題:
- 切換回管理控制台 WebSocket 頁面,向
sub_client/downlink
主題發布消息,可在消息訂閱列表收到發布的消息。
持久化發布消息
數據表
創建 mqtt_msg MQTT 消息持久化表:
CREATE TABLE mqtt_msg (
id SERIAL primary key,
msgid character varying(60),
sender character varying(100), -- 消息 pub 的 clientid
topic character varying(200),
qos integer,
retain integer, -- 是否 retain 消息
payload text,
arrived timestamp without time zone -- 消息抵達時間(QoS > 0)
);
配置項
打開配置文件,配置 Backend 規則,支持使用 topic
參數進行消息過濾,此處使用 #
通配符存儲任意主題消息:
## hook: message.publish
## action/function: on_message_publish
backend.pgsql.hook.message.publish.1 = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}
使用示例
在 EMQ X 管理控制台 WebSocket 頁面中,使用 clientdi sub_client
建立連接,向主題 upstream_topic
發布多條消息,EMQ X 將消息列表持久化至 mqtt_msg
表中:
暫只支持 QoS 1 2 的消息持久化。
Retain 消息持久化
表結構
創建 mqtt_retain Retain 消息存儲表:
CREATE TABLE mqtt_retain(
id SERIAL primary key,
topic character varying(200),
msgid character varying(60),
sender character varying(100),
qos integer,
payload text,
arrived timestamp without time zone,
UNIQUE (topic)
);
配置項
打開配置文件,配置 Backend 規則:
## 同時開啟以下規則,啟用 retain 持久化三個生命周期
## 發布非空 retain 消息時 (存儲)
backend.pgsql.hook.message.publish.2 = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}
## 設備訂閱主題時查詢 retain 消息
backend.pgsql.hook.session.subscribed.2 = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}
## 發布空 retain 消息時 (清除)
backend.pgsql.hook.message.publish.3 = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}
使用示例
在 EMQ X 管理控制台 WebSocket 頁面中建立連接后,發布消息勾選保留:
發布(消息不為空)
非空的 retain 消息發布時,EMQ X 將以 topic 為唯一鍵,持久化該條消息至 mqtt_retain
表中,相同主題下發布不同的 retain 消息,只有最后一條消息會被持久化:
訂閱
客戶端訂閱 retain 主題后,EMQ X 將查詢 mqtt_retain
數據表,執行投遞 retain 消息操作。
發布(消息為空)
MQTT 協議中,發布空的 retain 消息將清空 retain 記錄,此時 retain 記錄將從 mqtt_retain
表中刪除。
消息確認持久化
開啟消息確認 (ACK) 持久化后,客戶端訂閱 QoS 1、QoS 2 級別的主題時,EMQ X 將在數據庫以 clientid + topic 為唯一鍵初始化 ACK 記錄。
數據表
創建 mqtt_acked 消息確認表:
CREATE TABLE mqtt_acked (
id SERIAL primary key,
clientid character varying(100),
topic character varying(100),
mid integer,
created timestamp without time zone,
UNIQUE (clientid, topic)
);
配置項
打開配置文件,配置 Backend 規則,可使用 topic 通配符 過濾要應用的消息:
## 訂閱時初始化 ACK 記錄
backend.pgsql.hook.session.subscribed.1 = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}
## 消息抵達時更新抵達狀態
backend.pgsql.hook.message.acked.1 = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}
## 取消訂閱時刪除記錄行
backend.pgsql.hook.session.unsubscribed.1= {"topic": "#", "action": {"sql": ["delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}
使用示例
在 EMQ X 管理控制台 WebSocket 頁面中建立連接后,訂閱 QoS > 0 的主題:
此時 mqtt_acked
表將插入初始化數據行,每向主題發布一條 QoS > 0 的消息,消息抵達后數據行 mid 將自增 1:
代理訂閱中滿足 QoS > 0 的 topic 也會初始化記錄,客戶端取消訂閱后相關記錄將被刪除。
自定義 SQL
除去插件內置函數、表結構外,emqx_backend_pgsql 還支持自定義 SQL 語句,通過使用如 ${clientid}
模板語法動態構造 SQL 語句實現如客戶端連接歷史、更新自定義數據表等操作。
SQL語句參數說明
hook | 可用參數 | 示例(sql語句中${name} 表示可獲取的參數) |
---|---|---|
client.connected | clientid | insert into conn(clientid) values(${clientid}) |
client.disconnected | clientid | insert into disconn(clientid) values(${clientid}) |
session.subscribed | clientid, topic, qos | insert into sub(topic, qos) values(${topic}, ${qos}) |
session.unsubscribed | clientid, topic | delete from sub where topic = ${topic} |
message.publish | msgid, topic, payload, qos, clientid | insert into msg(msgid, topic) values(${msgid}, ${topic}) |
message.acked | msgid, topic, clientid | insert into ack(msgid, topic) values(${msgid}, ${topic}) |
message.delivered | msgid, topic, clientid | insert into delivered(msgid, topic) values(${msgid}, ${topic}) |
更新自定義數據表示例
應用現有設備表 clients
,具有設備連接認證、設備狀態記錄、設備管理等基本字段用於其他管理業務,現需要將 EMQ X 設備狀態同步至該表中:
CREATE TABLE "public"."clients" (
"id" serial,
"deviceUsername" varchar(50), -- MQTT username
"client_id" varchar(50), -- MQTT client_id
"password" varchar(50), -- MQTT password
"is_super" boolean DEFAULT 'false', -- 是否 ACL super 客戶端
"owner" int, -- 創建用戶
"productID" int, -- 所屬產品
"state" boolean DEFAULT 'false', -- 在線狀態
PRIMARY KEY ("id")
);
-- 初始化系統中已存在示例數據,此時 state 為 false
INSERT INTO "public"."clients"("deviceUsername", "client_id", "password", "is_super", "owner", "productID", "state") VALUES('mqtt_10c61f1a1f47', 'mqtt_10c61f1a1f47', '9336EBF25087D91C818EE6E9EC29F8C1', TRUE, 1, 21, FALSE);
自定義 UPDATE SQL 語句:
## connected / disconnected hook 中配置自定義 UPDATE SQL
## 可以配置多條 SQL 語句 "SQL": ["sql_a", "sql_b", "sql_c"]
## 連接時
backend.pgsql.hook.client.connected.3 = {"action": {"sql": ["update clients set state = true where client_id = ${clientid}"]}, "pool": "pool1"}
## 斷開時
backend.pgsql.hook.client.disconnected.3 = {"action": {"sql": ["update clients set state = false where client_id = ${clientid}"]}, "pool": "pool1"}
客戶端上線時將填充並執行預定的 SQL 語句,更新設備在線狀態 state
字段為 true
:
高級選項
backend.pgsql.time_range = 5s
backend.pgsql.max_returned_count = 500
總結
讀者在理解了 PostgreSQL 中所存儲的數據結構、自定義 SQL 之后,可以結合 PostgreSQL 拓展相關應用。