開源的 mqtt broker,易於使用和學習


MQTTX Project

license language

1 介紹

mqttx 基於 mqtt v3.1.1 官方協議文檔開發。
項目地址:https://github.com/Amazingwujun/mqttx
項目運行說明:

  1. 使用springboot推薦的啟動方式 java -jar app.jar,使用 mvn clean package 打包,這種方式需要修改配置文件(resources/application.yml)中 redis 地址和端口。

    mqttx default redis 連接使用的配置:localhost:6379無密碼

    配置項見 6.1 配置項

  2. 基於 docker 容器化部署,這個就比較簡單,具體的步驟見 容器化部署

中間件依賴:

  • [x] redis

其他依賴:

  1. 項目使用了 lombok,使用 ide 請安裝對應的插件

舉例:idea 需要安裝插件 Lombok, settings > Build,Execution,Deployment > Compiler > Annotation Processor 開啟 Enable annotation processing

我在雲端部署了一個 mqttx 單例服務,可供功能測試:

  1. 不支持 ssl
  2. 開啟了 websocket, 可通過 http://tools.emqx.io/ 測試,僅需將域名修改為:119.45.158.51(端口、地址不變)
  3. 支持 共享訂閱功能
  4. 部署版本 v1.0.4.RELEASE

websocket

2 架構

mqttx支持客戶端認證、topic 發布/訂閱鑒權功能,如果需要配套使用,建議的架構如下圖:

架構圖

客戶認證服務由使用者自行實現

內部實現框架關系(僅列出關鍵項):

ak6mB6.png

2.1 目錄結構

├─java
│  └─com
│      └─jun
│          └─mqttx
│              ├─broker         mqtt 協議實現及處理包
│              │  ├─codec       編解碼
│              │  └─handler     消息處理器(pub, sub, connn, etc)
│              ├─config         配置,主要是 bean 聲明
│              ├─constants      常量
│              ├─consumer       集群消息消費者
│              ├─entity         實體類
│              ├─exception      異常類
│              ├─service        業務服務(用戶認證, 消息存儲等)接口
│              │  └─impl        默認實現
│              └─utils          工具類
└─resources                     資源文件(application.yml 在此文件夾)
    └─tls                       ca 存放地址

3 容器化部署

為了方便項目快速的部署,引進 docker

  1. 執行本地部署動作前,需要先下載docker。
  2. docker-compose 文件中寫死了端口映射(1883, 8083), 如果你修改了 mqttx 的端口配置,則 docker-compose.yml 中也應修改
  1. 通過IDE提供的打包功能將項目打包為 target/*.jar
  2. 進入 dockerfile 同級目錄,執行 docker build -t mqttx:v1.0.4.RELEASE .
  3. 執行 docker-compose up

4 功能說明

4.1 qos 支持

  • [x] qos0
  • [x] qos1
  • [x] qos2

為支持 qos1、qos2,引入 redis 作為持久層,這部分已經封裝成接口,可自行替換實現(比如采用 mysql)。

4.2 topicFilter 支持

  1. 支持多級通配符 #與單級通配符 +,不支持通配符 $
  2. 不支持以 /結尾的topic,比如 a/b/,請改為 a/b。
  3. 其它規則見 mqtt v3.1.1 4.7 Topic Names and Topic Filters

ps:實際上 mqttx 僅對訂閱 topicFilter 進行校驗,publish 的 topic 是沒有做合法性檢查的。

當 topic 安全功能開啟后,客戶端只允許發布消息到被授權的主題

舉例:

topicFilter match topics
/a/b/+ /a/b/abc, /a/b/test
a/b/# a/b, a/b/abc, a/b/c/def
a/+/b/# a/nani/b/abc
/+/a/b/+/c /aaa/a/b/test/c

校驗工具類為:com.jun.mqttx.utils.TopicUtils

4.3 集群支持

項目引入 redis pub/sub 分發消息以支持集群功能。如果需要修改為 kafka 或其它 mq ,需要修改配置類 ClusterConfig 及替換實現類 InternalMessageServiceImpl

ak6nHK.png

  1. mqttx.cluster.enable:功能開關,默認 false

4.4 ssl 支持

開啟 ssl 你首先應該有了 ca(自簽名或購買),然后修改 application.yml 文件中幾個配置:

  1. mqttx.ssl.enable:功能開關,默認 false,同時控制 websocketsocket
  2. mqttx.ssl.key-store-location: 證書地址,基於 classpath
  3. mqttx.ssl.key-store-password: 證書密碼
  4. mqttx.ssl.key-store-type: keystore 類別,如 PKCS12

resources/tls 目錄中的 mqttx.keystore 僅供測試使用, 密碼: 123456

4.5 topic 安全支持

為了對 client 訂閱 topic 進行限制,加入 topic 訂閱&發布鑒權機制:

  1. mqttx.enable-topic-sub-pub-secure: 功能開關,默認 false
  2. 使用時需要實現接口 AuhenticationService ,該接口返回對象中含有 authorizedSub,authorizedPub 存儲 client 被授權訂閱及發布的 topic 列表。
  3. broker 在消息訂閱及發布都會校驗客戶端權限

支持的主題類型:

  • [x] 普通主題
  • [x] 共享主題
  • [x] 系統主題

4.6 共享主題支持

共享訂閱是 mqtt5 協議規定的內容,很多 mq(例如 kafka) 都有實現。

  1. mqttx.share-topic.enable: 功能開關,默認 true
  2. 格式: $share/{ShareName}/{filter}, $share 為前綴, ShareName 為共享訂閱名, filter 就是非共享訂閱主題過濾器。
  3. 目前支持 hash, random, round 三種規則

下圖展示了共享主題與常規主題之間的差異:

share-topic

msg-a 消息分發策略取決於配置項 mqttx.share-topic.share-sub-strategy

可以配合 cleanSession = 1 的會話,共享主題的客戶端斷開連接后會被服務端移除訂閱,這樣共享主題的消息只會分發給在線的客戶端。
注意: mqtt3.1.1 協議規定當 cleanSession = 1 時,連接斷開后與會話相關聯的所有狀態(不含 retained 消息)都會被刪除(mqtt5 增加了會話超時設置,感興趣的同學可以了解一下)。
mqttx v1.0.5.BETA 版本后(含),cleanSession = 1 的會話消息保存在內存中,具備極高的性能.

If CleanSession is set to 1, the Client and Server MUST discard any previous Session and start a new one. This Session lasts as long as the Network Connection. State data associated with this Session MUST NOT be reused in any subsequent Session [MQTT-3.1.2-6].

The Session state in the Client consists of:

  • QoS 1 and QoS 2 messages which have been sent to the Server, but have not been completely acknowledged.
  • QoS 2 messages which have been received from the Server, but have not been completely acknowledged.

The Session state in the Server consists of:

  • The existence of a Session, even if the rest of the Session state is empty.
  • The Client’s subscriptions.
  • QoS 1 and QoS 2 messages which have been sent to the Client, but have not been completely acknowledged.
  • QoS 1 and QoS 2 messages pending transmission to the Client.
  • QoS 2 messages which have been received from the Client, but have not been completely acknowledged.
  • Optionally, QoS 0 messages pending transmission to the Client.

4.7 websocket 支持

支持

4.8 系統主題

客戶端可通過訂閱系統主題獲取 broker 狀態,目前系統支持如下主題:

topic repeat comment
$SYS/broker/status false 訂閱此主題的客戶端會定期(mqttx.sys-topic.interval)收到 broker 的狀態,該狀態涵蓋下面所有主題的狀態值.
注意:客戶端連接斷開后,訂閱取消
$SYS/broker/activeConnectCount true 立即返回當前的活動連接數量
$SYS/broker/time true 立即返回當前時間戳
$SYS/broker/version true 立即返回 broker 版本

repeat:

  • repeat = false : 只需訂閱一次,broker 會定時發布數據到此主題.
  • repeat = true : 訂閱一次,broker 發布一次,可多次訂閱.

注意:

  1. topic 安全機制 同樣會影響客戶端訂閱系統主題, 未授權客戶端將無法訂閱系統主題
  2. 系統主題訂閱不會持久化

響應對象格式為 json 字符串:

{
	"activeConnectCount": 2,
	"timestamp": "2020-09-18 15:13:46",
	"version": "1.0.5.ALPHA"
}
field 說明
activeConnectCount 當前活躍連接數量
timestamp 時間戳;(yyyy-MM-dd HH:mm:ss)
version mqttx 版本

5 路線圖

基於我個人的認知,mqttx 接下來可能的開發計划:

  1. 集群態考慮整合服務注冊的功能,便於管理集群狀態,可能會使用 consul,做不做看我后面的想法吧
  2. bug fix and optimization,這個會一直繼續的,不過主要靠使用和學習 mqttx 的同學反饋問題給我(沒反饋我就當沒有唄~攤手.jpg)
  3. 目前正在開發基於 vue2.0, element-uimqttx-admin 管理平台,mqttx 的功能更新會暫停一段時間(最近在看 mqtt5)
    項目開發過程中發現需要對 mqttx 做一些改動,但這些改動不應該 push 給 mqttx master(比如 topic 安全認證這個功能需要配合 mqttx-platform,我可能會引入 Retrofit 處理接口調用,其實可以用 feign,我覺的這兩個都差不多),我應該會開一個業務 branch 處理這個事情。話說 javascript 寫項目可太爽了,以前怎么不覺得?
  4. mqttx 還沒壓測過,算了,看心情吧~ (有同學幫忙不?)
  5. netty 4.1.52.Final 支持了 mqtt5,em...

任何問題,請聯系我。郵箱:85998282@qq.com.

6 附表

6.1 配置項

配置 默認值 說明
mqttx.version 取自 pom.xml 版本
mqttx.brokerId 1 應用標志, 唯一
mqttx.heartbeat 50s 初始心跳,會被 conn 消息中的 keepalive 重置
mqttx.host 0.0.0.0 監聽地址
mqttx.soBacklog 512 tcp 連接處理隊列
mqttx.enableTopicSubPubSecure false 客戶訂閱/發布主題安全功能,開啟后將限制客戶端發布/訂閱的主題
mqttx.enableInnerCache true 發布消息每次都需要查詢 redis 來獲取訂閱的客戶端列表。開啟此功能后,將在內存中建立一個主題-客戶端關系映射, 應用直接訪問內存中的數據即可
mqttx.redis.clusterSessionHashKey mqttx.session.key redis map key;用於集群的會話存儲
mqttx.redis.topicPrefix mqttx:topic: 主題前綴; topic <--> client 映射關系保存
mqttx.redis.retainMessagePrefix mqttx:retain: 保留消息前綴, 保存 retian 消息
mqttx.redis.pubMsgSetPrefix mqttx:client:pubmsg: client pub消息 redis set 前綴; 保存 pubmsg,當收到 puback 獲取 pubrec 后刪除
mqttx.redis.pubRelMsgSetPrefix mqttx:client:pubrelmsg: client pubRel 消息 redis set 前綴;保存 pubrel 消息,收到 pubcom 消息刪除
mqttx.redis.topicSetKey mqttx:alltopic topic 集合,redis set key 值;保存所有的主題
mqttx.cluster.enable false 集群開關
mqttx.cluster.innerCacheConsistancyKey mqttx:cache_consistence 應用啟動后,先查詢 redis 中無此 key 值,然后在檢查一致性
mqttx.ssl.enable false ssl 開關
mqttx.ssl.keyStoreLocation classpath: tls/mqttx.keystore keyStore 位置
mqttx.ssl.keyStorePassword 用戶手動配置 keyStore 密碼
mqttx.ssl.keyStoreType pkcs12 keyStore 類別
mqttx.socket.enable true socket 開關
mqttx.socket.port 1883 socket 監聽端口
mqttx.websocket.enable false websocket 開關
mqttx.websocket.port 8083 websocket 監聽端口
mqttx.websocket.path /mqtt websocket path
mqttx.share-topic.enable true 共享主題功能開關
mqttx.share-topic.share-sub-strategy round 負載均衡策略, 目前支持隨機、輪詢、哈希
mqttx.sys-topic.enable false 系統主題功能開關
mqttx.sys-topic.interval 60s 定時發布間隔
mqttx.sys-topic.qos 0 主題 qos.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM