前言
現在大多網站會員系統或雲端協作平台上都有即時消息通知功能,即消息推送,這對用戶來說是非常貼心的功能。要實現消息推送服務,大致可以采用以下幾種方式:
- 使用HTTP輪循方式
- 說明:定時向HTTP服務端接口(Web Service API)獲取最新消息,可結合ajax技術實現頁面無刷新效果,這是主動拉取消息的機制,嚴格來說這不屬於消息推送。
- 優點:實現簡單、可控性強、部署成本低
- 缺點:實時性差
- 使用XMPP協議
- 說明:XMPP(可擴展消息處理現場協議)是基於可擴展標記語言(XML)的協議,它用於即時消息(IM)以及在線現場探測。它促進在服務器之間的准即時操作,其前身是Jabber,是一個開源形式組織產生的網絡即時通信協議。XMPP目前被IETF國際標准組織完成了標准化工作。
- 優點:協議成熟、強大、可擴展性強、目前主要應用於眾多IM系統
- 缺點:協議比較復雜、冗余(基於XML)、費流量、部署成本高高。
- 使用MQTT協議
- 說明:MQTT協議是IBM的Andy Stanford-Clark博士以及Arcom公司ArlenNipper博士發明的,是輕量級的、基於代理的“發布/訂閱”模式的消息傳輸協議
- 優點:MQTT協議簡潔、小巧、可擴展性強、流量開銷很小、目前已經應用到企業領域
- 缺點:還不夠成熟、實現較復雜、服務端組件不開源,部署成本較高
EasyPM作為一個團隊協作的項目管理平台,消息推送功能是少不了的。我們綜合眾多因素,最終采用MQTT協議方式,在眾多MQTT協議實現架構中,我們選擇了Apollo的實現架構。下邊就介紹下基於Apollo架構如何實現消息推送和接收。
Apollo是什么?
Apollo是apache旗下的基金項目,它是以Apache ActiveMQ5.x為基礎,采用全新的線程和消息調度架構重新實現的消息中間件,針對多核處理器進行了優化處理,它的速度更快、更可靠、更易於維護。apollo與ActiveQQ一樣支持多協議:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介紹MQTT協議的使用。
參考資源
Apollo的下載和安裝(本文僅介紹wendows系統安裝)
進入Apollo下載頁面 ,下載windows版本的壓縮包並解壓到自己的工作目錄(如:
E:\apache-apollo-1.7
),並創建系統環境變量APOLLO_HOME=E:\apache-apollo-1.7
。如果操作是系統是Windows Vista或更高版本,則需要安裝Microsoft Visual C++ 2010 Redistributable:
64位JVM
32位JVM
Apollo實例的創建和服務啟動
- 創建實例
進入E:\apache-apollo-1.7
之下的bin目錄,打開cmd窗口,執行命令:apollo create D:\apollo_broker
,命令執行成功后,在D盤下會有apollo_broker
目錄,這便是apollo的服務實例,apollo之旅便從這里開始。
在D:\apollo_broker
下有個bin目錄,其中有兩個文件:
apollo-broker.cmd是通過cmd命令啟動apollo服務的
apollo-broker-service.exe,是用於創建window服務的 - 命令行啟動服務
在D:\apollo_broker\bin
目錄下打開cmd窗口,執行apollo-broker run
命令來啟動apollo服務,
啟動成功可以在瀏覽器中查看運行情況,訪問地址為 http://127.0.0.1:61680 , 默認用戶名/密碼:admin/password
- 創建windows服務
找到cmd.exe
文件,點擊鼠標右鍵,以管理員身份運行,輸入創建windows服務命令,如下圖:
創建成功后,在windows服務中會有一個apollo_broker
服務,設置其自動啟動,啟動系統時apollo就會自動啟動了
MQTT協議的應用
MQTT協議有眾多客戶端實現,相關客戶端請參考apollo官方文檔
本文采用eclipse的paho客戶端實現
- javascript客戶端實現
將javascript客戶端項目下載下來,並在其項目根目錄下執行mvn命令,進行編譯,生成target目錄,其下生成mqttws31.js、mqttws31-min.js兩個js文件,將其拷貝到自己項目相關目錄下,並在頁面中引用,即可實現javascript客戶端的消息訂閱和發布,下邊時demo代碼:var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId"); /* 61623是ws連接的默認端口,可以在apollo中間件中進行配置 (關於apollo的配置請參考: http://activemq.apache.org/apollo/documentation/user-manual.html ) */ // set callback handlers client.onConnectionLost = onConnectionLost; client.onMessageArrived = onMessageArrived; // connect the client client.connect({userName:'admin',password:'password',onSuccess:onConnect}); // called when the client connects function onConnect() { // 連接成功后的處理 // Once a connection has been made, make a subscription and send a message. console.log("onConnect"); client.subscribe("/topic/event"); // 訂閱消息的主題 var message = new Paho.MQTT.Message("Hello,this is a test"); message.destinationName = "/topic/event"; client.send(message); // 發送消息 } // called when the client loses its connection function onConnectionLost(responseObject) { // 連接丟失后的處理 if (responseObject.errorCode !== 0) { console.log("onConnectionLost:"+responseObject.errorMessage); } } // called when a message arrives function onMessageArrived(message) { // 消息接收成功后的處理 console.log("onMessageArrived:"+message.payloadString); }
- java客戶端實現
paho java客戶端目前只支持J2SE和安卓,提供源碼下載和maven庫。
我們采用maven庫,其地址如下:
Official Releases
Nightly Snapshots
說明:設定版本為1.0.0或0.9.0時,其jar包根本加載不進來,最后搜到1.0.1版本才可以正常使用。
maven dependency配置:
java實現代碼:<dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.0.1</version> </dependency>
String topic = "MQTT Examples"; String content = "Message from MqttPublishSample"; int qos = 2; String broker = "tcp://127.0.0.1:61613"; String clientId = "JavaSample"; MemoryPersistence persistence = new MemoryPersistence(); try { MqttClient sampleClient = new MqttClient(broker, clientId, persistence); MqttConnectOptions connOpts = new MqttConnectOptions(); connOpts.setCleanSession(true); System.out.println("Connecting to broker: "+broker); sampleClient.connect(connOpts); System.out.println("Connected"); System.out.println("Publishing message: "+content); MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); sampleClient.publish(topic, message); System.out.println("Message published"); sampleClient.disconnect(); System.out.println("Disconnected"); System.exit(0); } catch(MqttException me) { System.out.println("reason "+me.getReasonCode()); System.out.println("msg "+me.getMessage()); System.out.println("loc "+me.getLocalizedMessage()); System.out.println("cause "+me.getCause()); System.out.println("excep "+me); me.printStackTrace(); }
小結
至此,MQTT協議已部署完畢,java端可以發布消息,而javascript端則可以訂閱並接收到java端發布的信息。
本文只是依照官網手冊而實現的簡單應用,講解不一定十分准確,有什么不對的地方還請多多指點,更詳細的應用請參考官網文檔:
apollo
eclipse paho