MQTT 一个连接,订阅多个主题封装


var mqtt = require('mqtt')
var MqttClient= mqtt.connect('mqtt://test.mosquitto.org')
const linkList = [] // 长连接的列表
MqttClient.linkState = true // 这里是自定义的长连接状态

// 连接成功
MqttClient.on("connect", () => {
        MqttClient.linkState = true // 改变连接状态
        const linkArr = Object.values(MqttClient.messageIdToTopic)
        
        linkList.forEach(item => {
            if(!linkArr.includes(item)) {
                MqttClient.subscribe(item)
            }
        })

    })    

// 链接报错
MqttClient.on("error", () => {
   MqttClient.linkState = false
})


    // 链接断开
MqttClient.on("close", () => {
    MqttClient.linkState = false
})
        
// 添加主题
export function AddLink(link){
    // 当连接中 直接订阅 
    if(MqttClient.linkState) {
        MqttClient.subscribe(link)
    }
    // 简单地去重
    if(!linkList.includes(link)) linkList.push(link)   
}

// 删除取消主题
export function DeleteLink(arr) {
    arr.forEach(item => {
        const key = linkList.findIndex(link => link === item)
        if(key >= 0) {
            linkList.splice(key,1)
            MqttClient.unsubscribe(item)
        }
    })
} 
// 回调消息
export function MyMqttMsg(Callback) {

    MqttClient.on("message", (topic, res) => {
       
        Callback(JSON.parse(res))
    })
}       

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM