ActiveMQ+MQTT實現客戶端訂閱推送模式(一)訂閱者


項目中經常會遇到這樣的場景

   1)  用戶注冊成功之后,不僅僅會有短信通知,可能還會有系統消息彈出,或者是其他形式,但是不論是什么形式,都離不開消息的傳遞行為

   2)    之前在200x年的時候,我們都會使用不停的polling 輪訓的方式,對后台不停的刷新,只有后端也或者是數據庫中有新加入的數據,立即取出將數據展示在界面上,以便通知用戶

   3)    在后來也就是大部分現在的模式,都是使用websocket的形式進行服務端反推送的模式,這樣效果可以達到,但是如果用戶注冊成功之后,就退出系統,或者是用戶沒有手機通知,這個時候,就算用戶注冊

成功,他也不知道自己已經注冊成功了,因為我們都知道websocket本身是不具備消息持久化的

    綜上所述,我們可以使用ActiveMQ/RabbitMQ+MQTT協議+前端mqtt.js 實現消息的同步以及持久化,這樣就解決了,如果該用戶注冊之后,立即退出,等他上線之后,會通知他,之前注冊的結果是成功還是

失敗

   1 工具匯總介紹:

    IDE:Eclipse 2019.6 若有版本問題請更換idea2018之后的版本,不在贅述,我這里是用的是基於Eclipse的 spring sts4

    ActiveMQ:5.15.12  linux下安裝 linux使用centos6.x/7.x均可   RHEL

    MQTT:v3版本  (MQTT只是一種協議,並非是一個產品,而ActiveMQ是包含MQTT協議的一款產品)

    web:用簡單的web html5 渲染一下即可

    MQTT.js :https://github.com/mqttjs/MQTT.js 直接下載即可,也可以從其他網站引入,不多可能時間加載會相對較長一點

   2 系統准備:

     使用上面介紹的開發工具以及相關插件包,進行如下部署

     2.1 打開Eclipse,新建一個springboot項目,其實自己演示的話,什么項目類型都無所謂的,一般的webapp項目也可以,maven類型的托管也可以,只不過現在走一把主流用boot而已,也確實簡單,項目路

 徑以及項目名稱,坐標地址,打包方式,各位隨意,如下圖所示:

       

    直接點擊finish待項目構建成功之后即可,若項目構建途中失敗,可以自己獲取依賴包進行安裝(關鍵要看maven倉庫的安裝路徑),后期的工作不在贅述。

     包結構如下:

      

    com.mqttapp: 核心包是不能改變位置的,除非是包的層級關系發生變更,否則一般不走變更

    com.mqttapp.config:一些常用的mqtt的一些工具方法放在這個包下

    com.mqttapp.controller: 對外界的一些接口位置,存放在此處

    com.mqttapp.handler: 句柄處理,其實這塊就是具體的業務邏輯存放的地方

    com.mqttapp.service: 業務接口定義的包

    至此后台的單體架構准備完畢,跟着就是對於場景的描述

   

    可以看出,在web端我們可以任意訂閱消息Broker中的主題Topic,在訂閱成功之后,mqttjs會有各種回調方法,這些回調方法,在實際開發中還是比較有用的

    onMessage->onMessageArrived->當有消息到達web端的時候,即刻出發該方法

    onConnectionLost->若連接丟失,或者出現了其他連接上的問題,會觸發該方法

    onConnect 當web端的mqtt與服務端的Broker連接成功之后會觸發該方法,這個方法很重要,里面涉及到了訂閱主題,以及指定連接消息質量qos等定義

   mqttws31.js整體配置如下:

  web端頁面

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>mqttws31.min.js 測試</title>
    <style>
        .divblock {
            display: inline-block; padding: 20px; border: 2px solid #00ff00; border-radius: 6px; margin: 20px 0px; user-select: none; } .divblock:active { background-color: #455072; border: 1px solid #0044ff; } </style> <script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script> </head> <body> <div>MQTT V3 socket協議測試</div> <div class="divblock" onclick="Onmqtttest()">mqttweb端</div><br/><br/> ClientID:<input type="text" value="" name="clientid" id="clientid"/><br/><br/> Topic: &nbsp;&nbsp;&nbsp;<input type="text" value="" name="topic" id="topic"/><br/><br/> <input type="button" value="訂閱指定主題" name="subScriberBtn" onclick="subscriber()"/> </body> <script> var options = { timeout: 50, keepAliveInterval: 100, useSSL: false, // userName: userName, // password: password,  onSuccess: onConnect, onFailure: function (e) { console.log(e); s = "{time:" + new Date().Format("yyyy-MM-dd hh:mm:ss") + ", onFailure()}"; console.log(s); } }; var client = null; function subscriber(){ // Create a client instance var clientid = document.getElementById("clientid").value; client = new Paho.MQTT.Client('127.0.0.1',Number(61614),clientid); // "hostname,port,clientid" // set callback handlers client.onConnectionLost = onConnectionLost; // 指定丟失事件 client.onMessageArrived = onMessageArrived; // 當對端消息到達web端之后處理 // connect the client 指定mqtt 事件 onSuccess 回調函數 // client.connect(options);  client.connect( { onSuccess: onConnect, cleanSession:false //這里是表示作為持久化訂閱者出現,不清楚在broker中的緩存數據 } ); } // called when the client connects 成功連接mqtt服務器之后的事件函數 function onConnect() { //Once a connection has been made, make a subscription and send a message. console.log("onConnect"); //這里可以訂閱多個不同的主題,只需要迭代對象數組即可 client.subscribe("bigdata",{qos:2}); // 訂閱主題帶qos的參數 //client.subscribe("hotekey_cloud"); // 訂閱主題  } // called when the client loses its connection mqtt 丟失或連接不存在而觸發的事件函數 function onConnectionLost(responseObject) { if (responseObject.errorCode !== 0) { console.log("onConnectionLost:" + responseObject.errorMessage); } } // called when a message arrives 接收到訂閱消息 function onMessageArrived(message) { console.log("onMessageArrived:" + message.payloadString); //若發布者發送的消息為對象類型,需要使用Json工具進行反序列化為對象,才能獲取報文中的數據 } // 推送消息函數 function mqttPublish(sendata) { message = new Paho.MQTT.Message(sendata); // 消息內容 message.destinationName = "bigdata"; // 目標主題 client.send(message); // 推送主題  } // 用戶程序點擊事件 function Onmqtttest() { message = "message from browser with mqtt protocol"; // 消息內容  mqttPublish(message); } </script> </html>

 我們在定義訂閱的主題協議時,可以這么去考慮

 1 我們的交互場景有幾種?沒一種場景是否會再次細分 即主題的分層結構

    /Sys/Module/xxxxx

    這樣的分層結構比較常用,需要注意的是符號 "/"在ActiveMQ的Broker中會以符號"."出現

 2 若需要一對一交互,我們在設計topic時,需要加上詳細的對象編號或者是用戶編號,或者是設別編號 如:

     /Sys/Notice/UserRegister/1449

 以上描述的是Mqtt協議中的通配符部分,更詳細的通配符請查看博客 https://www.cnblogs.com/newloft/articles/13034605.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM