ActiveMQ+MQTT实现客户端订阅推送模式(一)订阅者


项目中经常会遇到这样的场景

   1)  用户注册成功之后,不仅仅会有短信通知,可能还会有系统消息弹出,或者是其他形式,但是不论是什么形式,都离不开消息的传递行为

   2)    之前在200x年的时候,我们都会使用不停的polling 轮训的方式,对后台不停的刷新,只有后端也或者是数据库中有新加入的数据,立即取出将数据展示在界面上,以便通知用户

   3)    在后来也就是大部分现在的模式,都是使用websocket的形式进行服务端反推送的模式,这样效果可以达到,但是如果用户注册成功之后,就退出系统,或者是用户没有手机通知,这个时候,就算用户注册

成功,他也不知道自己已经注册成功了,因为我们都知道websocket本身是不具备消息持久化的

    综上所述,我们可以使用ActiveMQ/RabbitMQ+MQTT协议+前端mqtt.js 实现消息的同步以及持久化,这样就解决了,如果该用户注册之后,立即退出,等他上线之后,会通知他,之前注册的结果是成功还是

失败

   1 工具汇总介绍:

    IDE:Eclipse 2019.6 若有版本问题请更换idea2018之后的版本,不在赘述,我这里是用的是基于Eclipse的 spring sts4

    ActiveMQ:5.15.12  linux下安装 linux使用centos6.x/7.x均可   RHEL

    MQTT:v3版本  (MQTT只是一种协议,并非是一个产品,而ActiveMQ是包含MQTT协议的一款产品)

    web:用简单的web html5 渲染一下即可

    MQTT.js :https://github.com/mqttjs/MQTT.js 直接下载即可,也可以从其他网站引入,不多可能时间加载会相对较长一点

   2 系统准备:

     使用上面介绍的开发工具以及相关插件包,进行如下部署

     2.1 打开Eclipse,新建一个springboot项目,其实自己演示的话,什么项目类型都无所谓的,一般的webapp项目也可以,maven类型的托管也可以,只不过现在走一把主流用boot而已,也确实简单,项目路

 径以及项目名称,坐标地址,打包方式,各位随意,如下图所示:

       

    直接点击finish待项目构建成功之后即可,若项目构建途中失败,可以自己获取依赖包进行安装(关键要看maven仓库的安装路径),后期的工作不在赘述。

     包结构如下:

      

    com.mqttapp: 核心包是不能改变位置的,除非是包的层级关系发生变更,否则一般不走变更

    com.mqttapp.config:一些常用的mqtt的一些工具方法放在这个包下

    com.mqttapp.controller: 对外界的一些接口位置,存放在此处

    com.mqttapp.handler: 句柄处理,其实这块就是具体的业务逻辑存放的地方

    com.mqttapp.service: 业务接口定义的包

    至此后台的单体架构准备完毕,跟着就是对于场景的描述

   

    可以看出,在web端我们可以任意订阅消息Broker中的主题Topic,在订阅成功之后,mqttjs会有各种回调方法,这些回调方法,在实际开发中还是比较有用的

    onMessage->onMessageArrived->当有消息到达web端的时候,即刻出发该方法

    onConnectionLost->若连接丢失,或者出现了其他连接上的问题,会触发该方法

    onConnect 当web端的mqtt与服务端的Broker连接成功之后会触发该方法,这个方法很重要,里面涉及到了订阅主题,以及指定连接消息质量qos等定义

   mqttws31.js整体配置如下:

  web端页面

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>mqttws31.min.js 测试</title>
    <style>
        .divblock {
            display: inline-block; padding: 20px; border: 2px solid #00ff00; border-radius: 6px; margin: 20px 0px; user-select: none; } .divblock:active { background-color: #455072; border: 1px solid #0044ff; } </style> <script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script> </head> <body> <div>MQTT V3 socket协议测试</div> <div class="divblock" onclick="Onmqtttest()">mqttweb端</div><br/><br/> ClientID:<input type="text" value="" name="clientid" id="clientid"/><br/><br/> Topic: &nbsp;&nbsp;&nbsp;<input type="text" value="" name="topic" id="topic"/><br/><br/> <input type="button" value="订阅指定主题" name="subScriberBtn" onclick="subscriber()"/> </body> <script> var options = { timeout: 50, keepAliveInterval: 100, useSSL: false, // userName: userName, // password: password,  onSuccess: onConnect, onFailure: function (e) { console.log(e); s = "{time:" + new Date().Format("yyyy-MM-dd hh:mm:ss") + ", onFailure()}"; console.log(s); } }; var client = null; function subscriber(){ // Create a client instance var clientid = document.getElementById("clientid").value; client = new Paho.MQTT.Client('127.0.0.1',Number(61614),clientid); // "hostname,port,clientid" // set callback handlers client.onConnectionLost = onConnectionLost; // 指定丢失事件 client.onMessageArrived = onMessageArrived; // 当对端消息到达web端之后处理 // connect the client 指定mqtt 事件 onSuccess 回调函数 // client.connect(options);  client.connect( { onSuccess: onConnect, cleanSession:false //这里是表示作为持久化订阅者出现,不清楚在broker中的缓存数据 } ); } // called when the client connects 成功连接mqtt服务器之后的事件函数 function onConnect() { //Once a connection has been made, make a subscription and send a message. console.log("onConnect"); //这里可以订阅多个不同的主题,只需要迭代对象数组即可 client.subscribe("bigdata",{qos:2}); // 订阅主题带qos的参数 //client.subscribe("hotekey_cloud"); // 订阅主题  } // called when the client loses its connection mqtt 丢失或连接不存在而触发的事件函数 function onConnectionLost(responseObject) { if (responseObject.errorCode !== 0) { console.log("onConnectionLost:" + responseObject.errorMessage); } } // called when a message arrives 接收到订阅消息 function onMessageArrived(message) { console.log("onMessageArrived:" + message.payloadString); //若发布者发送的消息为对象类型,需要使用Json工具进行反序列化为对象,才能获取报文中的数据 } // 推送消息函数 function mqttPublish(sendata) { message = new Paho.MQTT.Message(sendata); // 消息内容 message.destinationName = "bigdata"; // 目标主题 client.send(message); // 推送主题  } // 用户程序点击事件 function Onmqtttest() { message = "message from browser with mqtt protocol"; // 消息内容  mqttPublish(message); } </script> </html>

 我们在定义订阅的主题协议时,可以这么去考虑

 1 我们的交互场景有几种?没一种场景是否会再次细分 即主题的分层结构

    /Sys/Module/xxxxx

    这样的分层结构比较常用,需要注意的是符号 "/"在ActiveMQ的Broker中会以符号"."出现

 2 若需要一对一交互,我们在设计topic时,需要加上详细的对象编号或者是用户编号,或者是设别编号 如:

     /Sys/Notice/UserRegister/1449

 以上描述的是Mqtt协议中的通配符部分,更详细的通配符请查看博客 https://www.cnblogs.com/newloft/articles/13034605.html


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM