js連接RabbitMQ達到實時消息推送
最近在自己捯飭一個網站,有一個功能是需要后端處理完數據把數據發布到MQ中,前端再從MQ中接收數據。但是前端連接MQ又成了一個問題,在網上搜了下資料,點進去一篇IBM DW后發現竟然是超哥寫的,真是巧哈~因為超哥寫的很好所以很多我就直接摘抄過來了,他應該不會介意的(逃。
參考
實現服務器端推送的幾種方式
Web 應用都是基於 HTTP 協議的請求/響應模式,無法像 TCP 協議那樣保持長連接,因此 Web 應用就很難像手機那樣實現實時的消息推送。就目前來看,Web 應用的消息推送方式主要有以下幾種:
1.Ajax 短輪詢
Ajax 輪詢主要通過頁面端的 JS 定時異步刷新任務來實現數據的加載,但這種方式實時效果較差,而且對服務端的壓力也較大。
2.長輪詢
長輪詢主要也是通過 Ajax 機制,但區別於傳統的 Ajax 應用,長輪詢的服務器端會在沒有數據時阻塞請求直到有新的數據產生或者請求超時才返回,之后客戶端再重新建立連接獲取數據。但長輪詢服務端會長時間地占用資源,如果消息頻繁發送的話會給服務端帶來較大的壓力。
3.WebSocket 雙向通信
WebSocket 是 HTML5 中一種新的通信協議,能夠實現瀏覽器與服務器之間全雙工通信。如果瀏覽器和服務端都支持 WebSocket 協議的話,該方式實現的消息推送無疑是最高效、簡潔的。並且最新版本的 IE、Firefox、Chrome 等瀏覽器都已經支持 WebSocket 協議,Apache Tomcat 7.0.27 以后的版本也開始支持 WebSocket。
安裝RabbitMQ
在macOS上安裝rabbitmq(提前已經安裝了brew):
brew install rabbitmq
啟動stomp有關的一系列插件
rabbitmq-plugins enable rabbitmq_management rabbitmq_web_stomp rabbitmq_stomp rabbitmq_web_stomp_examples
可以通過下面這條命令查看已啟動了哪些RabbitMQ插件:
rabbitmq-plugins list
重啟rabbitmq:
brew services restart rabbitmq
我們可以在15670端口訪問web-stomp-examples,
http://localhost:15670/
其中RabbitMQ運行在15672端口,stomp服務運行在15674端口。
前端通過websocket連接RabbitMQ
RabbitMQ 有很多第三方插件,可以在 AMQP 協議基礎上做出許多擴展的應用。Web STOMP 插件就是基於 AMQP 之上的 STOMP 文本協議插件,利用 WebSocket 能夠輕松實現瀏覽器和服務器之間的實時消息傳遞,具體實現方式如下圖所示:
RabbitMQ Web STOMP 插件可以理解為 HTML5 WebSocket 與 STOMP 協議間的橋接,目的也是為了讓瀏覽器能夠使用 RabbitMQ。當 RabbitMQ 消息服務器開啟了 STOMP 和 Web STOMP 插件后,瀏覽器端就可以輕松地使用 WebSocket 或者 SockerJS 客戶端實現與 RabbitMQ 服務器進行通信。
前端通過stomp連接RabbitMQ的代碼如下:
// 初始化 ws 對象
if (location.search == '?ws') {
var ws = new WebSocket('ws://localhost:15674/ws');
} else {
var ws = new SockJS('http://localhost:15674/stomp');
}
// 獲得Stomp client對象
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
client.debug = pipe('#second');
// 定義連接成功回調函數
var on_connect = function(x) {
//data.body是接收到的數據
client.subscribe("/queue/default", function(data) {
var msg = data.body;
alert("收到數據:" + msg);
});
};
// 定義錯誤時回調函數
var on_error = function() {
console.log('error');
};
// 連接RabbitMQ
client.connect('guest', 'guest', on_connect, on_error, '/');
console.log(">>>連接上http://localhost:15674");
我們可以看到代碼主要包括以下幾個部分:
- 初始化websocket對象
- 構造stomp client
- 定義連接成功的回調函數
- 定義連接錯誤時的回調函數
- 連接RabbitMQ
因此我們要做的主要是定義這個連接成功的回調函數,其中:
client.subscribe("/queue/default", function(data) {
var msg = data.body;
alert("收到數據:" + msg);
});
這個函數的功能是訂閱了一個名為"default"的queue,當有生產者向該隊列發送數據時,該函數會作為消費者接收到數據,並觸發回調函數。我們可以在回調函數中對接收到的數據進行展示,更新到界面上,從而可以達到和輪詢一樣的效果。
當我們把這段js放進html中run起來后,通過Chrome F12工具也可以看到這樣的提示:
Opening Web Socket...
stomp.js:114 Web Socket Opened...
stomp.js:114 >>> CONNECT
accept-version:1.1,1.0
heart-beat:0,0
host:/
login:guest
passcode:guest
stomp.js:114 <<< CONNECTED
server:RabbitMQ/3.6.6
session:***
heart-beat:0,0
version:1.1
stomp.js:114 connected to server RabbitMQ/3.6.6
stomp.js:114 >>> SUBSCRIBE
id:sub-0
destination:/queue/default
當我們向/queue/default發送一條測試數據“aaa”,前端js就會接收到數據,再看F12工具中的顯示:
<<< MESSAGE
subscription:sub-0
destination:/queue/default
message-id:***
redelivered:false
persistent:1
content-length:3
aaa
可以看到已經收到了"aaa"這條消息,並且可以注意到Stomp協議是以文本格式進行數據傳輸的,而RabbitMQ是以二進制傳輸的。通過data.body就可以獲取到Stomp中的消息的主體了。
思考
關於websocket連接MQ進行實時消息推送,固然是實現消息推送的一個很好的方式,但是不得不思考在大量並發情況下,MQ端是否能承受的了數量龐大的websocket連接。MQ將前后端進行了解耦和異步化,但是也可能成為系統的性能瓶頸。對於這種實現方式,目前只能說是且學且用,性能方面還尚待考究。