共享訂閱
簡介
共享訂閱是在多個訂閱者之間實現負載均衡的訂閱方式:
[subscriber1] got msg1
msg1, msg2, msg3 /
[publisher] ----------------> "$share/g/topic" -- [subscriber2] got msg2
\
[subscriber3] got msg3
上圖中,共享 3 個 subscriber 用共享訂閱的方式訂閱了同一個主題 $share/g/topic
,其中topic
是它們訂閱的真實主題名,而 $share/g/
是共享訂閱前綴。EMQ X 支持兩種格式的共享訂閱前綴:
示例 | 前綴 | 真實主題名 |
---|---|---|
$queue/t/1 | $queue/ | t/1 |
$share/abc/t/1 | $share/abc | t/1 |
帶群組的共享訂閱
以 $share/<group-name>
為前綴的共享訂閱是帶群組的共享訂閱:
group-name 可以為任意字符串,屬於同一個群組內部的訂閱者將以負載均衡接收消息,但 EMQ X 會向不同群組廣播消息。
例如,假設訂閱者 s1,s2,s3 屬於群組 g1,訂閱者 s4,s5 屬於群組 g2。那么當 EMQ X 向這個主題發布消息 msg1 的時候:
- EMQ X 會向兩個群組 g1 和 g2 同時發送 msg1
- s1,s2,s3 中只有一個會收到 msg1
- s4,s5 中只有一個會收到 msg1
[s1]
msg1 /
[emqx] ------> "$share/g1/topic" - [s2] got msg1
| \
| [s3]
| msg1
----> "$share/g2/topic" -- [s4]
\
[s5] got msg1
下面對帶群組的共享訂閱進行測試:
添加兩個訂閱:


客戶端發送五條消息:

消息以負載均衡的方式發送到訂閱者:
不帶群組的共享訂閱
以 $queue/
為前綴的共享訂閱是不帶群組的共享訂閱。它是 $share
訂閱的一種特例,相當與所有訂閱者都在一個訂閱組里面:
[s1] got msg1
msg1,msg2,msg3 /
[emqx] ---------------> "$queue/topic" - [s2] got msg2
\
[s3] got msg3
測試:
添加兩個訂閱者:


發送五條消息,結果如下
均衡策略與派發 Ack 配置
EMQ X 的共享訂閱支持均衡策略與派發 Ack 配置:
# 均衡策略
broker.shared_subscription_strategy = random
# 適用於 QoS1 QoS2 消息,啟用時在其中一個組離線時,將派發給另一個組
broker.shared_dispatch_ack_enabled = false
均衡策略 | 描述 |
---|---|
random | 在所有訂閱者中隨機選擇 |
round_robin | 按照訂閱順序 |
sticky | 一直發往上次選取的訂閱者 |
hash | 按照發布者 ClientID 的哈希值 |
無論是單客戶端訂閱還是共享訂閱都要注意客戶端性能與消息接收速率,否則會引發消息堆積、客戶端崩潰等錯誤。
延遲發布
EMQ X 的延遲發布功能可以實現按照用戶配置的時間間隔延遲發布 PUBLISH 報文的功能。當客戶端使用特殊主題前綴 $delayed/{DelayInteval}
發布消息到 EMQ X 時,將觸發延遲發布功能。
延遲發布主題的具體格式如下:
$delayed/{DelayInterval}/{TopicName}
$delayed
: 使用$delay
作為主題前綴的消息都將被視為需要延遲發布的消息。延遲間隔由下一主題層級中的內容決定。{DelayInterval}
: 指定該 MQTT 消息延遲發布的時間間隔,單位是秒,允許的最大間隔是 4294967 秒。如果{DelayInterval}
無法被解析為一個整型數字,EMQ X 將丟棄該消息,客戶端不會收到任何信息。{TopicName}
: MQTT 消息的主題名稱。
例如:
$delayed/15/x/y
: 15 秒后將 MQTT 消息發布到主題x/y
。$delayed/60/a/b
: 1 分鍾后將 MQTT 消息發布到a/b
。$delayed/3600/$SYS/topic
: 1 小時后將 MQTT 消息發布到$SYS/topic
測試:
先開啟該功能:
然后重啟emqx服務。
添加一個監聽者監聽d1主題:
發送一個10秒延遲的消息
10秒后,接收到消息:
代理訂閱
EMQ X 的代理訂閱功能使得客戶端在連接建立時,不需要發送額外的 SUBSCRIBE 報文,便能自動建立用戶預設的訂閱關系。
內置代理訂閱
EMQ X 通過內置代理訂閱模塊就可以通過配置文件來指定代理訂閱規則從而實現代理訂閱,適用於有規律可循的靜態的代理訂閱需求。
代理訂閱功能由 emqx_mod_subscription
內置模塊提供,此功能默認關閉,支持在 EMQ X Broker 運行期間動態啟停
相關訂閱選項
-
服務質量( QoS )
服務端可以向客戶端發送的應用消息的最大 QoS 等級。
-
NL( No Local )
應用消息是否能夠被轉發到發布此消息的客戶端。
- NL 值為 0 時,表示應用消息可以被轉發給發布此消息的客戶端。
- NL 值為 1 時,表示應用消息不能被轉發給發布此消息的客戶端。
-
RAP( Retain As Published )
向此訂閱轉發應用消息時,是否保持消息被發布時設置的保留(RETAIN)標志。
- RAP 值為 0 時,表示向此訂閱轉發應用消息時把保留標志設置為 0。
- RAP 值為 1 時,表示向此訂閱轉發應用消息時保持消息被發布時設置的保留標志。
-
RH( Retain Handling )
當訂閱建立時,是否發送保留消息
- 0:訂閱建立時發送保留消息
- 1:訂閱建立時,若該訂閱當前不存在則發送保留消息
- 2:訂閱建立時不要發送保留消息
代理訂閱規則
代理訂閱規則的格式如下:
## 代理訂閱的主題
module.subscription.<number>.topic = <topic>
## 代理訂閱的訂閱選項:QoS
## 可選值: 0、1、2
## 默認值:1
module.subscription.<number>.qos = <qos>
## 代理訂閱的訂閱選項:No Local
## 可選值: 0、1
## 默認值:0
module.subscription.<number>.nl = <nl>
## 代理訂閱的訂閱選項:Retain As Published
## 可選值: 0、1
## 默認值:0
module.subscription.<number>.rap = <rap>
## 代理訂閱的訂閱選項:Retain Handling
## 可選值: 0、1、2
## 默認值:0
module.subscription.<number>.rh = <rh>
在配置代理訂閱的主題時,EMQ X 提供了 %c
和 %u
兩個占位符供用戶使用,EMQ X 會在執行代理訂閱時將配置中的 %c
和 %u
分別替換為客戶端的 Client ID
和 Username
,需要注意的是,%c
和 %u
必須占用一整個主題層級。
修改emqx.conf配置,並重啟emqx服務
module.subscription.1.topic = testtopic/%c
module.subscription.1.qos = 2
module.subscription.1.nl = 1
module.subscription.1.rap = 1
module.subscription.1.rh = 1
設置連接:
這里並沒有添加訂閱,發送消息到主題testtopic/%c,會自動添加代理訂閱,收到消息
在EMQ X Enterprise 版本中支持動態代理訂閱,即通過外部數據庫設置主題列表在設備連接時讀取列表實現代理訂閱。
主題重寫
EMQ X 的主題重寫功能支持根據用戶配置的規則在客戶端訂閱主題、發布消息、取消訂閱的時候將 A 主題重寫為 B 主題。
EMQ X 的保留消息 和 延遲發布可以與主題重寫配合使用,例如,當用戶想使用延遲發布功能,但不方便修改客戶端發布的主題時,可以使用主題重寫將相關主題重寫為延遲發布的主題格式。
由於 ACL 檢查會在主題重寫之前執行,所以只要確保重寫之前的主題能夠通過 ACL 檢查即可。
啟停主題重寫功能
主題重寫功能由 emqx_mod_rewrite
內置模塊提供, 此功能默認關閉,支持在 EMQ X Broker 運行期間動態啟停
配置主題重寫規則
EMQ X 的主題重寫規則需要用戶自行配置,用戶可以自行添加多條主題重寫規則,規則的數量沒有限制,但由於任何攜帶主題的 MQTT 報文都需要匹配一遍重寫規則,因此此功能在高吞吐場景下帶來的性能損耗與規則數量是成正比的,用戶需要謹慎地使用此功能。
每條主題重寫規則的格式如下:
module.rewrite.pub.rule.<number> = 主題過濾器 正則表達式 目標表達式
module.rewrite.sub.rule.<number> = 主題過濾器 正則表達式 目標表達式
重寫規則分為 Pub 規則和 Sub 規則,Pub 規則匹配 PUSHLISH 報文攜帶的主題,Sub 規則匹配 SUBSCRIBE、UNSUBSCRIBE 報文攜帶的主題。
每條重寫規則都由以空格分隔的主題過濾器、正則表達式、目標表達式三部分組成。在主題重寫功能開啟的前提下,EMQ X 在收到諸如 PUBLISH 報文等帶有主題的 MQTT 報文時,將使用報文中的主題去依次匹配配置文件中規則的主題過濾器部分,一旦成功匹配,則使用正則表達式提取主題中的信息,然后替換至目標表達式以構成新的主題。
目標表達式中可以使用 $N
這種格式的變量匹配正則表達中提取出來的元素,$N
的值為正則表達式中提取出來的第 N 個元素,比如 $1
即為正則表達式提取的第一個元素。
需要注意的是,EMQ X 使用倒序讀取配置文件中的重寫規則,當一條主題可以同時匹配多條主題重寫規則的主題過濾器時,EMQ X 僅會使用它匹配到的第一條規則進行重寫,如果該條規則中的正則表達式與 MQTT 報文主題不匹配,則重寫失敗,不會再嘗試使用其他的規則進行重寫。因此用戶在使用時需要謹慎的設計 MQTT 報文主題以及主題重寫規則。
示例
假設 emqx.conf
文件中已經添加了以下主題重寫規則,修改完成后,重啟emqx服務
module.rewrite.sub.rule.1 = y/+/z/# ^y/(.+)/z/(.+)$ y/z/$2
module.rewrite.pub.rule.1 = x/# ^x/y/(.+)$ z/y/x/$1
module.rewrite.pub.rule.2 = x/y/+ ^x/y/(\d+)$ z/y/$1
此時我們分別訂閱 y/a/z/b
、y/def
、x/1/2
、x/y/2
、x/y/z
五個主題:
- 當客戶端訂閱
y/def
主題時,y/def
不匹配任何一個主題過濾器,因此不執行主題重寫,直接訂閱y/def
主題。 - 當客戶端訂閱
y/a/z/b
主題時,y/a/z/b
匹配y/+/z/#
主題過濾器,EMQ X 執行module.rewrite.sub.rule.1
規則,通過正則正則表達式匹配出元素[a、b]
,將匹配出來的第二個元素帶入y/z/$2
,實際訂閱了y/z/b
主題。 - 當客戶端向
x/1/2
主題發送消息時,x/1/2
匹配x/#
主題過濾器,EMQ X 執行module.rewrite.pub.rule.1
規則,通過正則表達式未匹配到元素,不執行主題重寫,因此直接向x/1/2
主題發送消息。 - 當客戶端向
x/y/2
主題發送消息時,x/y/2
同時匹配x/#
和x/y/+
兩個主題過濾器,EMQ X 通過倒序讀取配置,所以優先匹配module.rewrite.pub.rule.2
,通過正則替換,實際向z/y/2
主題發送消息。 - 當客戶端向
x/y/z
主題發送消息時,x/y/z
同時匹配x/#
和x/y/+
兩個主題過濾器,EMQ X 通過倒序讀取配置,所以優先匹配module.rewrite.pub.rule.2,通過正則表達式未匹配到元素,不執行主題重寫,直接向
x/y/z主題發送消息。需要注意的是,即使
module.rewrite.pub.rule.2的正則表達式匹配失敗,也不會再次去匹配
module.rewrite.pub.rule.1` 的規則。
測試:
我這里向y/z/b主題發送消息

訂閱了y/a/z/b主題的收到了消息,說明主題重寫生效。

黑名單
EMQ X 為用戶提供了黑名單功能,用戶可以通過相關的 HTTP API 將指定客戶端加入黑名單以拒絕該客戶端訪問,除了客戶端標識符以外,還支持直接封禁用戶名甚至 IP 地址。
黑名單只適用於少量客戶端封禁需求,如果有大量客戶端需要認證管理,請使用 認證功能。
在黑名單功能的基礎上,EMQ X 支持自動封禁那些被檢測到短時間內頻繁登錄的客戶端,並且在一段時間內拒絕這些客戶端的登錄,以避免此類客戶端過多占用服務器資源而影響其他客戶端的正常使用。
需要注意的是,自動封禁功能只封禁客戶端標識符,並不封禁用戶名和 IP 地址,即該機器只要更換客戶端標識符就能夠繼續登錄。
此功能默認關閉,用戶可以在 emqx.conf
配置文件中將 enable_flapping_detect
配置項設為 on
以啟用此功能。
zone.external.enable_flapping_detect = off
用戶可以為此功能調整觸發閾值和封禁時長,對應配置項如下:
flapping_detect_policy = 30, 1m, 5m
此配置項的值以 ,
分隔,依次表示客戶端離線次數,檢測的時間范圍以及封禁時長,因此上述默認配置即表示如果客戶端在 1 分鍾內離線次數達到 30 次,那么該客戶端使用的客戶端標識符將被封禁 5 分鍾
黑名單 HTTP API
GET /api/v4/banned
獲取黑名單
Query String Parameters:
同 /api/v4/clients
。
Success Response Body (JSON):
Name | Type | Description |
---|---|---|
code | Integer | 0 |
data | Array | 由對象構成的數組,對象中的字段與 POST 方法中的 Request Body 相同 |
meta | Object | 同 /api/v4/clients |
Examples:
獲取黑名單列表:
$ curl -i --basic -u admin:public -vX GET "http://localhost:8081/api/v4/banned"
{"meta":{"page":1,"limit":10000,"count":1},"data":[{"who":"example","until":1582265833,"reason":"undefined","by":"user","at":1582265533,"as":"clientid"}],"code":0}
POST /api/v4/banned
將對象添加至黑名單
Parameters (json):
Name | Type | Required | Default | Description |
---|---|---|---|---|
who | String | Required | 添加至黑名單的對象,可以是客戶端標識符、用戶名和 IP 地址 | |
as | String | Required | 用於區分黑名單對象類型,可以是 clientid ,username ,peerhost |
|
reason | String | Required | 詳細信息 | |
by | String | Optional | user | 指示該對象被誰添加至黑名單 |
at | Integer | Optional | 當前系統時間 | 添加至黑名單的時間,單位:秒 |
until | Integer | Optional | 當前系統時間 + 5 分鍾 | 何時從黑名單中解除,單位:秒 |
Success Response Body (JSON):
Name | Type | Description |
---|---|---|
code | Integer | 0 |
data | Object | 與傳入的 Request Body 相同 |
Examples:
將 client 添加到黑名單:
$ curl -i --basic -u admin:public -vX POST "http://localhost:8081/api/v4/banned" -d '{"who":"example","as":"clientid","reason":"example"}'
{"data":{"who":"example","as":"clientid"},"code":0}
DELETE /api/v4/banned/{as}/{who}
將對象從黑名單中刪除
Parameters: 無
Success Response Body (JSON):
Name | Type | Description |
---|---|---|
code | Integer | 0 |
message | String | 僅在發生錯誤時返回,用於提供更詳細的錯誤信息 |
xamples:
將 client 從黑名單中移除:
$ curl -i --basic -u admin:public -X DELETE "http://localhost:8081/api/v4/banned/clientid/example"
{"code":0}
速率限制
簡介
EMQ X 提供對接入速度、消息速度的限制:當客戶端連接請求速度超過指定限制的時候,暫停新連接的建立;當消息接收速度超過指定限制的時候,暫停接收消息。
速率限制是一種 backpressure 方案,從入口處避免了系統過載,保證了系統的穩定和可預測的吞吐。速率限制可在 emqx.conf
中配置:
配置項 | 類型 | 默認值 | 描述 |
---|---|---|---|
listener.tcp.external.max_conn_rate | Number | 1000 | 本節點上允許的最大連接速率 (conn/s) |
zone.external.rate_limit.conn_messagess_in | Number,Duration | 無限制 | 單連接上允許的最大發布速率 (msg/s) |
zone.external.rate_limit.conn_bytes_in | Size,Duration | 無限制 | 單連接上允許的最大報文速率 (bytes/s) |
- max_conn_rate 是單個 emqx 節點上連接建立的速度限制。
1000
代表秒最多允許 1000 個客戶端接入。 - conn_messages_in 是單個連接上接收 PUBLISH 報文的速率限制。
100,10s
代表每個連接上允許收到的最大 PUBLISH 消息速率是每 10 秒 100 個。 - conn_bytes_in 是單個連接上接收 TCP數據包的速率限制。
100KB,10s
代表每個連接上允許收到的最大 TCP 報文速率是每 10 秒 100KB。
conn_messages_in
和 conn_bytes_in
提供的都是針對單個連接的限制,EMQ X 目前沒有提供全局的消息速率限制。
速率限制原理
EMQ X 使⽤令牌桶 (Token Bucket)算法來對所有的 Rate Limit 來做控制。 令牌桶算法 的邏輯如下圖:

- 存在一個可容納令牌(Token) 的最大值 burst 的桶(Bucket),最大值 burst 簡記為 b 。
- 存在一個 rate 為每秒向桶添加令牌的速率,簡記為 r 。當桶滿時則不不再向桶中加⼊入令牌。
- 每當有 1 個(或 N 個)請求抵達時,則從桶中拿出 1 個 (或 N 個) 令牌。如果令牌不不夠則阻塞,等待令牌的⽣生成。
由此可知該算法中:
-
長期來看,所限制的請求速率的平均值等於 rate 的值。
-
記實際請求達到速度為 M,且 M > r,那么,實際運⾏中能達到的最大(峰值)速率為 M = b + r,證明:
容易想到,最大速率 M 為:能在1個單位時間內消耗完滿狀態令牌桶的速度。而桶中令牌的消耗速度為 M - r,故可知:b / (M - r) = 1,得 M = b + r
令牌桶算法在 EMQ X 中的應用
當使用如下配置做報文速率限制的時候:
zone.external.rate_limit.conn_bytes_in = 100KB,10s
EMQ X 將使用兩個值初始化每個連接的 rate-limit 處理器:
- rate = 100 KB / 10s = 10240 B/s
- burst = 100 KB = 102400 B
根據 消息速率限制原理 中的算法,可知:
- 長期來看允許的平均速率限制為 10240 B/s
- 允許的峰值速率為 102400 + 10240 = 112640 B/s
為提高系統吞吐,EMQ X 的接入模塊不會一條一條的從 socket 讀取報文,而是每次從 socket 讀取 N 條報文。rate-limit 檢查的時機就是在收到這 N 條報文之后,准備繼續收取下個 N 條報文之前。故實際的限制速率不會如算法一樣精准。EMQ X 只提供了一個大概的速率限制。N
的值可以在 etc/emqx.conf
中配置:
配置項 | 類型 | 默認值 | 描述 |
---|---|---|---|
listener.tcp.external.active_n | Number | 100 | emqx 每次從 TCP 棧讀取多少條消息 |
飛行窗口和消息隊列
簡介
為了提高消息吞吐效率和減少網絡波動帶來的影響,EMQ X 允許多個未確認的 QoS 1 和 QoS 2 報文同時存在於網路鏈路上。這些已發送但未確認的報文將被存放在 Inflight Window 中直至完成確認。
當網絡鏈路中同時存在的報文超出限制,即 Inflight Window 到達長度限制(見 max_inflight
)時,EMQ X 將不再發送后續的報文,而是將這些報文存儲在 Message Queue 中。一旦 Inflight Window 中有報文完成確認,Message Queue 中的報文就會以先入先出的順序被發送,同時存儲到 Inflight Window 中。
當客戶端離線時,Message Queue 還會被用來存儲 QoS 0 消息,這些消息將在客戶端下次上線時被發送。這功能默認開啟,當然你也可以手動關閉,見 mqueue_store_qos0
。
需要注意的是,如果 Message Queue 也到達了長度限制,后續的報文將依然緩存到 Message Queue,但相應的 Message Queue 中最先緩存的消息將被丟棄。如果隊列中存在 QoS 0 消息,那么將優先丟棄 QoS 0 消息。因此,根據你的實際情況配置一個合適的 Message Queue 長度限制(見 max_mqueue_len
)是非常重要的。
飛行隊列與 Receive Maximum
MQTT v5.0 協議為 CONNECT 報文新增了一個 Receive Maximum
的屬性,官方對它的解釋是:
客戶端使用此值限制客戶端願意同時處理的 QoS 為 1 和 QoS 為 2 的發布消息最大數量。沒有機制可以限制服務端試圖發送的 QoS 為 0 的發布消息 。
也就是說,服務端可以在等待確認時使用不同的報文標識符向客戶端發送后續的 PUBLISH 報文,直到未被確認的報文數量到達 Receive Maximum
限制。
不難看出,Receive Maximum
其實與 EMQ X 中的 Inflight Window 機制如出一轍,只是在 MQTT v5.0 協議發布前,EMQ X 就已經對接入的 MQTT 客戶端提供了這一功能。現在,使用 MQTT v5.0 協議的客戶端將按照 Receive Maximum
的規范來設置 Inflight Window 的最大長度,而更低版本 MQTT 協議的客戶端則依然按照配置來設置。
配置項
配置項 | 類型 | 可取值 | 默認值 | 說明 |
---|---|---|---|---|
max_inflight | integer | >= 0 | 32 (external), 128 (internal) | Inflight Window 長度限制,0 即無限制 |
max_mqueue_len | integer | >= 0 | 1000 (external), 10000 (internal) | Message Queue 長度限制,0 即無限制 |
mqueue_store_qos0 | enum | true , false |
true | 客戶端離線時 EMQ X 是否存儲 QoS 0 消息至 Message Queue |
消息重傳
簡介
消息重傳 (Message Retransmission) 是屬於 MQTT 協議標准規范的一部分。
協議中規定了作為通信的雙方 服務端 和 客戶端 對於自己發送到對端的 PUBLISH 消息都應滿足其 服務質量 (Quality of Service levels) 的要求。如:
- QoS 1:表示 消息至少送達一次 (At least once delivery);即發送端會一直重發該消息,除非收到了對端對該消息的確認。意思是在 MQTT 協議的上層(即業務的應用層)相同的 QoS 1 消息可能會收到多次。
- QoS 2:表示 消息只送達一次 (Exactly once delivery);即該消息在上層僅會接收到一次。
雖然,QoS 1 和 QoS 2 的 PUBLISH 報文在 MQTT 協議棧這一層都會發生重傳,但請你謹記的是:
- QoS 1 消息發生重傳后,在 MQTT 協議棧上層,也會收到這些重發的 PUBLISH 消息。
- QoS 2 消息無論如何重傳,最終在 MQTT 協議棧上層,都只會收到一條 PUBLISH 消息
基礎配置
有兩種場景會導致消息重發:
- PUBLISH 報文發送給對端后,規定時間內未收到應答。則重發這個報文。
- 在保持會話的情況下,客戶端重連后;EMQ X 會自動重發 未應答的消息,以確保 QoS 流程的正確。
在 etc/emqx.conf
中可配置:
配置項 | 類型 | 可取值 | 默認值 | 說明 |
---|---|---|---|---|
retry_interval | duration | - | 30s | 等待一個超時間隔,如果沒收到應答則重傳消息 |
一般來說,你只需要關心以上內容就足夠了。