MQTTX Project
1 介紹
mqttx
基於 mqtt v3.1.1 官方協議文檔開發。
項目地址:https://github.com/Amazingwujun/mqttx
項目運行說明:
-
使用
springboot
推薦的啟動方式java -jar app.jar
,使用mvn clean package
打包,這種方式需要修改配置文件(resources/application.yml
)中 redis 地址和端口。mqttx default redis 連接使用的配置:
localhost:6379
, 無密碼配置項見 6.1 配置項
-
基於
docker
容器化部署,這個就比較簡單,具體的步驟見 容器化部署
中間件依賴:
- [x] redis
其他依賴:
- 項目使用了 lombok,使用 ide 請安裝對應的插件
舉例:idea 需要安裝插件 Lombok, settings > Build,Execution,Deployment > Compiler > Annotation Processor 開啟 Enable annotation processing
我在雲端部署了一個 mqttx
單例服務,可供功能測試:
- 不支持 ssl
- 開啟了 websocket, 可通過 http://tools.emqx.io/ 測試,僅需將域名修改為:
119.45.158.51
(端口、地址不變) - 支持 共享訂閱功能
- 部署版本
v1.0.4.RELEASE
2 架構
mqttx
支持客戶端認證、topic 發布/訂閱鑒權功能,如果需要配套使用,建議的架構如下圖:
客戶認證服務由使用者自行實現
內部實現框架關系(僅列出關鍵項):
2.1 目錄結構
├─java
│ └─com
│ └─jun
│ └─mqttx
│ ├─broker mqtt 協議實現及處理包
│ │ ├─codec 編解碼
│ │ └─handler 消息處理器(pub, sub, connn, etc)
│ ├─config 配置,主要是 bean 聲明
│ ├─constants 常量
│ ├─consumer 集群消息消費者
│ ├─entity 實體類
│ ├─exception 異常類
│ ├─service 業務服務(用戶認證, 消息存儲等)接口
│ │ └─impl 默認實現
│ └─utils 工具類
└─resources 資源文件(application.yml 在此文件夾)
└─tls ca 存放地址
3 容器化部署
為了方便項目快速的部署,引進 docker
- 執行本地部署動作前,需要先下載docker。
- docker-compose 文件中寫死了端口映射(
1883, 8083
), 如果你修改了mqttx
的端口配置,則docker-compose.yml
中也應修改
- 通過IDE提供的打包功能將項目打包為 target/*.jar
- 進入 dockerfile 同級目錄,執行
docker build -t mqttx:v1.0.4.RELEASE .
- 執行 docker-compose up
4 功能說明
4.1 qos 支持
- [x] qos0
- [x] qos1
- [x] qos2
為支持 qos1、qos2,引入 redis
作為持久層,這部分已經封裝成接口,可自行替換實現(比如采用 mysql
)。
4.2 topicFilter 支持
- 支持多級通配符
#
與單級通配符+
,不支持通配符$
- 不支持以
/
結尾的topic,比如 a/b/,請改為 a/b。 - 其它規則見 mqtt v3.1.1 4.7 Topic Names and Topic Filters
ps:實際上 mqttx 僅對訂閱 topicFilter 進行校驗,publish 的 topic 是沒有做合法性檢查的。
當 topic 安全功能開啟后,客戶端只允許發布消息到被授權的主題
舉例:
topicFilter | match topics |
---|---|
/a/b/+ | /a/b/abc, /a/b/test |
a/b/# | a/b, a/b/abc, a/b/c/def |
a/+/b/# | a/nani/b/abc |
/+/a/b/+/c | /aaa/a/b/test/c |
校驗工具類為:com.jun.mqttx.utils.TopicUtils
4.3 集群支持
項目引入 redis pub/sub
分發消息以支持集群功能。如果需要修改為 kafka
或其它 mq ,需要修改配置類 ClusterConfig
及替換實現類 InternalMessageServiceImpl
。
mqttx.cluster.enable
:功能開關,默認false
4.4 ssl 支持
開啟 ssl 你首先應該有了 ca(自簽名或購買),然后修改 application.yml
文件中幾個配置:
mqttx.ssl.enable
:功能開關,默認false
,同時控制websocket
與socket
mqttx.ssl.key-store-location
: 證書地址,基於classpath
mqttx.ssl.key-store-password
: 證書密碼mqttx.ssl.key-store-type
: keystore 類別,如PKCS12
resources/tls
目錄中的mqttx.keystore
僅供測試使用, 密碼:123456
4.5 topic 安全支持
為了對 client 訂閱 topic 進行限制,加入 topic 訂閱&發布鑒權機制:
mqttx.enable-topic-sub-pub-secure
: 功能開關,默認false
- 使用時需要實現接口
AuhenticationService
,該接口返回對象中含有authorizedSub,authorizedPub
存儲 client 被授權訂閱及發布的topic
列表。 - broker 在消息訂閱及發布都會校驗客戶端權限
支持的主題類型:
- [x] 普通主題
- [x] 共享主題
- [x] 系統主題
4.6 共享主題支持
共享訂閱是 mqtt5
協議規定的內容,很多 mq(例如 kafka
) 都有實現。
mqttx.share-topic.enable
: 功能開關,默認true
- 格式:
$share/{ShareName}/{filter}
,$share
為前綴,ShareName
為共享訂閱名,filter
就是非共享訂閱主題過濾器。 - 目前支持
hash
,random
,round
三種規則
下圖展示了共享主題與常規主題之間的差異:
msg-a
消息分發策略取決於配置項mqttx.share-topic.share-sub-strategy
可以配合
cleanSession = 1
的會話,共享主題的客戶端斷開連接后會被服務端移除訂閱,這樣共享主題的消息只會分發給在線的客戶端。
注意:mqtt3.1.1
協議規定當cleanSession = 1
時,連接斷開后與會話相關聯的所有狀態(不含 retained 消息)都會被刪除(mqtt5
增加了會話超時設置,感興趣的同學可以了解一下)。
mqttx v1.0.5.BETA
版本后(含),cleanSession = 1
的會話消息保存在內存中,具備極高的性能.If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be reused in any subsequent Session [MQTT-3.1.2-6].
The Session state in the Client consists of:
- QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged.
- QoS 2 messages which have been received from the Server, but have not been completely acknowledged.
The Session state in the Server consists of:
- The existence of a Session, even if the rest of the Session state is empty.
- The Client’s subscriptions.
- QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged.
- QoS 1 and QoS 2 messages pending transmission to the Client.
- QoS 2 messages which have been received from the Client, but have not been completely acknowledged.
- Optionally, QoS 0 messages pending transmission to the Client.
4.7 websocket 支持
支持
4.8 系統主題
客戶端可通過訂閱系統主題獲取 broker 狀態,目前系統支持如下主題:
topic | repeat | comment |
---|---|---|
$SYS/broker/status |
false |
訂閱此主題的客戶端會定期(mqttx.sys-topic.interval )收到 broker 的狀態,該狀態涵蓋下面所有主題的狀態值. 注意:客戶端連接斷開后,訂閱取消 |
$SYS/broker/activeConnectCount |
true |
立即返回當前的活動連接數量 |
$SYS/broker/time |
true |
立即返回當前時間戳 |
$SYS/broker/version |
true |
立即返回 broker 版本 |
repeat
:
repeat = false
: 只需訂閱一次,broker 會定時發布數據到此主題.repeat = true
: 訂閱一次,broker 發布一次,可多次訂閱.注意:
- topic 安全機制 同樣會影響客戶端訂閱系統主題, 未授權客戶端將無法訂閱系統主題
- 系統主題訂閱不會持久化
響應對象格式為 json
字符串:
{
"activeConnectCount": 2,
"timestamp": "2020-09-18 15:13:46",
"version": "1.0.5.ALPHA"
}
field | 說明 |
---|---|
activeConnectCount |
當前活躍連接數量 |
timestamp |
時間戳;(yyyy-MM-dd HH:mm:ss ) |
version |
mqttx 版本 |
5 路線圖
基於我個人的認知,mqttx
接下來可能的開發計划:
- 集群態考慮整合服務注冊的功能,便於管理集群狀態,可能會使用
consul
,做不做看我后面的想法吧 - bug fix and optimization,這個會一直繼續的,不過主要靠使用和學習
mqttx
的同學反饋問題給我(沒反饋我就當沒有唄~攤手.jpg) - 目前正在開發基於
vue2.0
,element-ui
的 mqttx-admin 管理平台,mqttx
的功能更新會暫停一段時間(最近在看 mqtt5)。
項目開發過程中發現需要對mqttx
做一些改動,但這些改動不應該 push 給 mqttx master(比如 topic 安全認證這個功能需要配合mqttx-platform
,我可能會引入 Retrofit 處理接口調用,其實可以用feign
,我覺的這兩個都差不多),我應該會開一個業務 branch 處理這個事情。話說javascript
寫項目可太爽了,以前怎么不覺得? mqttx
還沒壓測過,算了,看心情吧~ (有同學幫忙不?)netty 4.1.52.Final 支持了 mqtt5
,em...
任何問題,請聯系我。郵箱:85998282@qq.com.
6 附表
6.1 配置項
配置 | 默認值 | 說明 |
---|---|---|
mqttx.version |
取自 pom.xml |
版本 |
mqttx.brokerId |
1 |
應用標志, 唯一 |
mqttx.heartbeat |
50s |
初始心跳,會被 conn 消息中的 keepalive 重置 |
mqttx.host |
0.0.0.0 |
監聽地址 |
mqttx.soBacklog |
512 |
tcp 連接處理隊列 |
mqttx.enableTopicSubPubSecure |
false |
客戶訂閱/發布主題安全功能,開啟后將限制客戶端發布/訂閱的主題 |
mqttx.enableInnerCache |
true |
發布消息每次都需要查詢 redis 來獲取訂閱的客戶端列表。開啟此功能后,將在內存中建立一個主題-客戶端關系映射, 應用直接訪問內存中的數據即可 |
mqttx.redis.clusterSessionHashKey |
mqttx.session.key |
redis map key;用於集群的會話存儲 |
mqttx.redis.topicPrefix |
mqttx:topic: |
主題前綴; topic <--> client 映射關系保存 |
mqttx.redis.retainMessagePrefix |
mqttx:retain: |
保留消息前綴, 保存 retian 消息 |
mqttx.redis.pubMsgSetPrefix |
mqttx:client:pubmsg: |
client pub消息 redis set 前綴; 保存 pubmsg,當收到 puback 獲取 pubrec 后刪除 |
mqttx.redis.pubRelMsgSetPrefix |
mqttx:client:pubrelmsg: |
client pubRel 消息 redis set 前綴;保存 pubrel 消息,收到 pubcom 消息刪除 |
mqttx.redis.topicSetKey |
mqttx:alltopic |
topic 集合,redis set key 值;保存所有的主題 |
mqttx.cluster.enable |
false |
集群開關 |
mqttx.cluster.innerCacheConsistancyKey |
mqttx:cache_consistence |
應用啟動后,先查詢 redis 中無此 key 值,然后在檢查一致性 |
mqttx.ssl.enable |
false |
ssl 開關 |
mqttx.ssl.keyStoreLocation |
classpath: tls/mqttx.keystore |
keyStore 位置 |
mqttx.ssl.keyStorePassword |
用戶手動配置 |
keyStore 密碼 |
mqttx.ssl.keyStoreType |
pkcs12 |
keyStore 類別 |
mqttx.socket.enable |
true |
socket 開關 |
mqttx.socket.port |
1883 |
socket 監聽端口 |
mqttx.websocket.enable |
false |
websocket 開關 |
mqttx.websocket.port |
8083 |
websocket 監聽端口 |
mqttx.websocket.path |
/mqtt |
websocket path |
mqttx.share-topic.enable |
true |
共享主題功能開關 |
mqttx.share-topic.share-sub-strategy |
round |
負載均衡策略, 目前支持隨機、輪詢、哈希 |
mqttx.sys-topic.enable |
false |
系統主題功能開關 |
mqttx.sys-topic.interval |
60s |
定時發布間隔 |
mqttx.sys-topic.qos |
0 |
主題 qos. |