EMQ X 插件持久化系列 (四)PostgreSQL 數據持久化


本章節以在 CentOS 7.2 中的實際例子來說明如何通過 PostgreSQL 來存儲相關的信息。

作為開源關系數據庫重要一員,PostgreSQL 標榜自己是世界上最先進的開源數據庫,相比於其他開源關系數據庫如 MySQL,PostgreSQL 是完全由社區驅動的開源項目,由全世界超過 1000 名貢獻者所維護。PostgreSQL 提供了單個完整功能的版本,而不像 MySQL 那樣提供了多個不同的社區版、商業版與企業版。PostgreSQL 基於自由的 BSD/MIT 許可,組織可以使用、復制、修改和重新分發代碼,只需要提供一個版權聲明即可。

PostgreSQL 具有諸多特性,在 GIS 領域有較多支持,其“無鎖定”特性非常突出,支持函數和條件索引,有成熟的集群方案。PostgreSQL 還具備及其強悍的 SQL 編程能力如統計函數和統計語法支持,通過 Timescaledb 插件,PostgreSQL 可以轉變為功能完備的時序數據庫 Timescaledb 。

功能概覽

  • 客戶端在線狀態存儲
  • 客戶端代理訂閱
  • 持久化發布消息
  • Retain 消息持久化
  • 消息確認持久化
  • 自定義 SQL

安裝與驗證 PostgreSQL 服務器

讀者可以參考 PostgreSQL 官方文檔Docker 來下載安裝PostgreSQL 服務器,本文章使用 PostgreSQL 10.1 版本。

為方便管理操作,可下載使用免費圖形化管理軟件 Postico(僅限 MacOS)或 pgAdmin

配置 EMQ X 服務器

通過 RPM 方式安裝的 EMQ X,PostgreSQL 相關的配置文件位於 /etc/emqx/plugins/emqx_backend_pgsql.conf,如果只是測試 PostgreSQL 持久化的功能,大部分配置不需要做更改,填入用戶名、密碼、數據庫即可:

backend.pgsql.pool1.server = 127.0.0.1:5432

backend.pgsql.pool1.pool_size = 8

backend.pgsql.pool1.username = root

backend.pgsql.pool1.password = public

backend.pgsql.pool1.database = mqtt

backend.pgsql.pool1.ssl = false

保持剩下部分的配置文件不變,然后需要啟動該插件。啟動插件的方式有 命令行控制台兩種方式,讀者可以任選其一。

通過命令行啟動

emqx_ctl plugins load emqx_backend_pgsql

通過管理控制台啟動

EMQ X 管理控制台 插件 頁面中,找到 emqx_backend_pgsql 插件,點擊 啟動

客戶端在線狀態存儲

客戶端上下線時,插件將更新在線狀態、上下線時間、節點客戶端列表至 PostgreSQL 數據庫。

數據表

創建 mqtt_client 設備在線狀態表:

CREATE TABLE mqtt_client(
  id SERIAL primary key,
  clientid character varying(100),
  state integer, -- 在線狀態: 0 離線 1 在線
  node character varying(100), -- 接入節點名稱
  online_at timestamp, -- 上線時間
  offline_at timestamp, -- 下線時間
  created timestamp without time zone,
  UNIQUE (clientid)
);

配置項

打開配置文件,配置 Backend 規則:

## hook: client.connected、client.disconnected
## action/function: on_client_connected、on_client_disconnected


## 客戶端上下線
backend.pgsql.hook.client.connected.1 = {"action": {"function": "on_client_connected"}, "pool": "pool1"}

## 客戶端下線
backend.pgsql.hook.client.disconnected.1 = {"action": {"function": "on_client_disconnected"}, "pool": "pool1"}

使用示例

瀏覽器打開 http://127.0.0.1:18083 EMQ X 管理控制台,在 工具 -> Websocket 中新建一個客戶端連接,指定 clientid 為 sub_client,點擊連接,連接成功后手動斷開:

image20181116105333637.png

查看 mqtt_client 表,此時將寫入 / 更新一條客戶端上下線記錄:

1.png

客戶端代理訂閱

客戶端上線時,存儲模塊直接從數據庫讀取預設待訂閱列表,代理加載訂閱主題。在客戶端需要通過預定主題通信(接收消息)場景下,應用能從數據層面設定 / 改變代理訂閱列表。

數據表

創建 mqtt_sub 代理訂閱關系表:

CREATE TABLE mqtt_sub(
  id SERIAL primary key,
  clientid character varying(100),
  topic character varying(200), -- topic
  qos integer, -- QoS
  created timestamp without time zone,
  UNIQUE (clientid, topic)
);

配置項

打開配置文件,配置 Backend 規則:

## hook: client.connected
## action/function: on_subscribe_lookup
backend.pgsql.hook.client.connected.2    = {"action": {"function": "on_subscribe_lookup"}, "pool": "pool1"}

使用示例

sub_client 設備上線時,需要為其訂閱 sub_client/upstreamsub_client/downlink 兩個 QoS 1 的主題:

  1. mqtt_sub 表中初始化插入代理訂閱主題信息:
insert into mqtt_sub(clientid, topic, qos) values('sub_client', 'sub_client/upstream', 1);

insert into mqtt_sub(clientid, topic, qos) values('sub_client', 'sub_client/downlink', 1);
  1. EMQ X 管理控制台 WebSocket 頁面,以 clientid sub_client 新建一個客戶端連接,切換至訂閱頁面,可見當前客戶端自動訂閱了 sub_client/upstreamsub_client/downlink 兩個 QoS 1 的主題:

image20181116110036523.png

  1. 切換回管理控制台 WebSocket 頁面,向 sub_client/downlink 主題發布消息,可在消息訂閱列表收到發布的消息。

持久化發布消息

數據表

創建 mqtt_msg MQTT 消息持久化表:

CREATE TABLE mqtt_msg (
  id SERIAL primary key,
  msgid character varying(60),
  sender character varying(100), -- 消息 pub 的 clientid
  topic character varying(200),
  qos integer,
  retain integer, -- 是否 retain 消息
  payload text,
  arrived timestamp without time zone -- 消息抵達時間(QoS > 0)
);

配置項

打開配置文件,配置 Backend 規則,支持使用 topic 參數進行消息過濾,此處使用 # 通配符存儲任意主題消息:

## hook: message.publish
## action/function: on_message_publish

backend.pgsql.hook.message.publish.1     = {"topic": "#", "action": {"function": "on_message_publish"}, "pool": "pool1"}

使用示例

在 EMQ X 管理控制台 WebSocket 頁面中,使用 clientdi sub_client 建立連接,向主題 upstream_topic 發布多條消息,EMQ X 將消息列表持久化至 mqtt_msg 表中:

websocket.png

暫只支持 QoS 1 2 的消息持久化。

Retain 消息持久化

表結構

創建 mqtt_retain Retain 消息存儲表:

CREATE TABLE mqtt_retain(
  id SERIAL primary key,
  topic character varying(200),
  msgid character varying(60),
  sender character varying(100),
  qos integer,
  payload text,
  arrived timestamp without time zone,
  UNIQUE (topic)
);

配置項

打開配置文件,配置 Backend 規則:

## 同時開啟以下規則,啟用 retain 持久化三個生命周期

## 發布非空 retain 消息時 (存儲)
backend.pgsql.hook.message.publish.2     = {"topic": "#", "action": {"function": "on_message_retain"}, "pool": "pool1"}

## 設備訂閱主題時查詢 retain 消息
backend.pgsql.hook.session.subscribed.2  = {"topic": "#", "action": {"function": "on_retain_lookup"}, "pool": "pool1"}

## 發布空 retain 消息時 (清除)
backend.pgsql.hook.message.publish.3     = {"topic": "#", "action": {"function": "on_retain_delete"}, "pool": "pool1"}

使用示例

在 EMQ X 管理控制台 WebSocket 頁面中建立連接后,發布消息勾選保留

image20181119111926675.png

發布(消息不為空)

非空的 retain 消息發布時,EMQ X 將以 topic 為唯一鍵,持久化該條消息至 mqtt_retain 表中,相同主題下發布不同的 retain 消息,只有最后一條消息會被持久化:

image20181119112306703.png

訂閱

客戶端訂閱 retain 主題后,EMQ X 將查詢 mqtt_retain 數據表,執行投遞 retain 消息操作。

發布(消息為空)

MQTT 協議中,發布空的 retain 消息將清空 retain 記錄,此時 retain 記錄將從 mqtt_retain 表中刪除。

消息確認持久化

開啟消息確認 (ACK) 持久化后,客戶端訂閱 QoS 1、QoS 2 級別的主題時,EMQ X 將在數據庫以 clientid + topic 為唯一鍵初始化 ACK 記錄。

數據表

創建 mqtt_acked 消息確認表:

CREATE TABLE mqtt_acked (
  id SERIAL primary key,
  clientid character varying(100),
  topic character varying(100),
  mid integer,
  created timestamp without time zone,
  UNIQUE (clientid, topic)
);

配置項

打開配置文件,配置 Backend 規則,可使用 topic 通配符 過濾要應用的消息:

## 訂閱時初始化 ACK 記錄
backend.pgsql.hook.session.subscribed.1  = {"topic": "#", "action": {"function": "on_message_fetch"}, "pool": "pool1"}


## 消息抵達時更新抵達狀態
backend.pgsql.hook.message.acked.1       = {"topic": "#", "action": {"function": "on_message_acked"}, "pool": "pool1"}

## 取消訂閱時刪除記錄行
backend.pgsql.hook.session.unsubscribed.1= {"topic": "#", "action": {"sql": ["delete from mqtt_acked where clientid = ${clientid} and topic = ${topic}"]}, "pool": "pool1"}

使用示例

在 EMQ X 管理控制台 WebSocket 頁面中建立連接后,訂閱 QoS > 0 的主題:

image20181119140251843.png

此時 mqtt_acked 表將插入初始化數據行,每向主題發布一條 QoS > 0 的消息,消息抵達后數據行 mid 將自增 1:

image20181119165248998.png

代理訂閱中滿足 QoS > 0 的 topic 也會初始化記錄,客戶端取消訂閱后相關記錄將被刪除。

自定義 SQL

除去插件內置函數、表結構外,emqx_backend_pgsql 還支持自定義 SQL 語句,通過使用如 ${clientid} 模板語法動態構造 SQL 語句實現如客戶端連接歷史、更新自定義數據表等操作。

SQL語句參數說明

hook 可用參數 示例(sql語句中${name} 表示可獲取的參數)
client.connected clientid insert into conn(clientid) values(${clientid})
client.disconnected clientid insert into disconn(clientid) values(${clientid})
session.subscribed clientid, topic, qos insert into sub(topic, qos) values(${topic}, ${qos})
session.unsubscribed clientid, topic delete from sub where topic = ${topic}
message.publish msgid, topic, payload, qos, clientid insert into msg(msgid, topic) values(${msgid}, ${topic})
message.acked msgid, topic, clientid insert into ack(msgid, topic) values(${msgid}, ${topic})
message.delivered msgid, topic, clientid insert into delivered(msgid, topic) values(${msgid}, ${topic})

更新自定義數據表示例

應用現有設備表 clients,具有設備連接認證、設備狀態記錄、設備管理等基本字段用於其他管理業務,現需要將 EMQ X 設備狀態同步至該表中:

CREATE TABLE "public"."clients" (
    "id" serial,
    "deviceUsername" varchar(50), --  MQTT username
    "client_id" varchar(50), -- MQTT client_id
    "password" varchar(50), -- MQTT password
    "is_super" boolean DEFAULT 'false', -- 是否 ACL super 客戶端
    "owner" int, -- 創建用戶
    "productID" int, -- 所屬產品
    "state" boolean DEFAULT 'false', -- 在線狀態
    PRIMARY KEY ("id")
);

-- 初始化系統中已存在示例數據,此時 state 為 false
INSERT INTO "public"."clients"("deviceUsername", "client_id", "password", "is_super", "owner", "productID", "state") VALUES('mqtt_10c61f1a1f47', 'mqtt_10c61f1a1f47', '9336EBF25087D91C818EE6E9EC29F8C1', TRUE, 1, 21, FALSE);

自定義 UPDATE SQL 語句:

## connected / disconnected hook 中配置自定義 UPDATE SQL
## 可以配置多條 SQL 語句 "SQL": ["sql_a", "sql_b", "sql_c"]

## 連接時
backend.pgsql.hook.client.connected.3 = {"action": {"sql": ["update clients set state = true where client_id = ${clientid}"]}, "pool": "pool1"}

## 斷開時
backend.pgsql.hook.client.disconnected.3 = {"action": {"sql": ["update clients set state = false where client_id = ${clientid}"]}, "pool": "pool1"}

客戶端上線時將填充並執行預定的 SQL 語句,更新設備在線狀態 state 字段為 true

image20181119170648517.png

高級選項

backend.pgsql.time_range = 5s

backend.pgsql.max_returned_count = 500

總結

讀者在理解了 PostgreSQL 中所存儲的數據結構、自定義 SQL 之后,可以結合 PostgreSQL 拓展相關應用。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM