使用ActiveMQ Apollo實現即時消息推送


前言

現在大多網站會員系統或雲端協作平台上都有即時消息通知功能,即消息推送,這對用戶來說是非常貼心的功能。要實現消息推送服務,大致可以采用以下幾種方式:

  1. 使用HTTP輪循方式
    • 說明:定時向HTTP服務端接口(Web Service API)獲取最新消息,可結合ajax技術實現頁面無刷新效果,這是主動拉取消息的機制,嚴格來說這不屬於消息推送。
    • 優點:實現簡單、可控性強、部署成本低
    • 缺點:實時性差
  2. 使用XMPP協議
    • 說明:XMPP(可擴展消息處理現場協議)是基於可擴展標記語言(XML)的協議,它用於即時消息(IM)以及在線現場探測。它促進在服務器之間的准即時操作,其前身是Jabber,是一個開源形式組織產生的網絡即時通信協議。XMPP目前被IETF國際標准組織完成了標准化工作。
    • 優點:協議成熟、強大、可擴展性強、目前主要應用於眾多IM系統
    • 缺點:協議比較復雜、冗余(基於XML)、費流量、部署成本高高。
  3. 使用MQTT協議
    • 說明:MQTT協議是IBM的Andy Stanford-Clark博士以及Arcom公司ArlenNipper博士發明的,是輕量級的、基於代理的“發布/訂閱”模式的消息傳輸協議
    • 優點:MQTT協議簡潔、小巧、可擴展性強、流量開銷很小、目前已經應用到企業領域
    • 缺點:還不夠成熟、實現較復雜、服務端組件不開源,部署成本較高

EasyPM作為一個團隊協作的項目管理平台,消息推送功能是少不了的。我們綜合眾多因素,最終采用MQTT協議方式,在眾多MQTT協議實現架構中,我們選擇了Apollo的實現架構。下邊就介紹下基於Apollo架構如何實現消息推送和接收。

 

Apollo是什么?

Apollo是apache旗下的基金項目,它是以Apache ActiveMQ5.x為基礎,采用全新的線程和消息調度架構重新實現的消息中間件,針對多核處理器進行了優化處理,它的速度更快、更可靠、更易於維護。apollo與ActiveQQ一樣支持多協議:STOMP、AMQP、MQTT、Openwire、 SSL、WebSockets,本文只介紹MQTT協議的使用。

 

參考資源

  1. MQTT
  2. ActiveMQ5
  3. Apollo官方文檔
  4. eclipse paho

 

Apollo的下載和安裝(本文僅介紹wendows系統安裝)

進入Apollo下載頁面 ,下載windows版本的壓縮包並解壓到自己的工作目錄(如:E:\apache-apollo-1.7),並創建系統環境變量APOLLO_HOME=E:\apache-apollo-1.7

如果操作是系統是Windows Vista或更高版本,則需要安裝Microsoft Visual C++ 2010 Redistributable:
64位JVM
32位JVM

 

Apollo實例的創建和服務啟動

  1. 創建實例
    進入E:\apache-apollo-1.7之下的bin目錄,打開cmd窗口,執行命令:apollo create D:\apollo_broker,命令執行成功后,在D盤下會有apollo_broker目錄,這便是apollo的服務實例,apollo之旅便從這里開始。
    D:\apollo_broker下有個bin目錄,其中有兩個文件:
    apollo-broker.cmd是通過cmd命令啟動apollo服務的
    apollo-broker-service.exe,是用於創建window服務的
  2. 命令行啟動服務
    D:\apollo_broker\bin目錄下打開cmd窗口,執行apollo-broker run命令來啟動apollo服務,
    啟動成功可以在瀏覽器中查看運行情況,訪問地址為 http://127.0.0.1:61680 , 默認用戶名/密碼:admin/password
  3. 創建windows服務
    找到cmd.exe文件,點擊鼠標右鍵,以管理員身份運行,輸入創建windows服務命令,如下圖:
    EasyPM 團隊協作 敏捷看板
    創建成功后,在windows服務中會有一個apollo_broker服務,設置其自動啟動,啟動系統時apollo就會自動啟動了
    EasyPM 項目管理 迭代規划

 

MQTT協議的應用

MQTT協議有眾多客戶端實現,相關客戶端請參考apollo官方文檔
本文采用eclipse的paho客戶端實現

  1. javascript客戶端實現
    將javascript客戶端項目下載下來,並在其項目根目錄下執行mvn命令,進行編譯,生成target目錄,其下生成mqttws31.js、mqttws31-min.js兩個js文件,將其拷貝到自己項目相關目錄下,並在頁面中引用,即可實現javascript客戶端的消息訂閱和發布,下邊時demo代碼:
    var client = new Paho.MQTT.Client(location.hostname, 61623,"/", "clientId"); 
    /* 61623是ws連接的默認端口,可以在apollo中間件中進行配置
    (關於apollo的配置請參考:
    http://activemq.apache.org/apollo/documentation/user-manual.html
    ) */
    // set callback handlers 
    client.onConnectionLost = onConnectionLost; 
    client.onMessageArrived = onMessageArrived; 
    // connect the client 
    client.connect({userName:'admin',password:'password',onSuccess:onConnect}); 
    // called when the client connects 
    function onConnect() { // 連接成功后的處理 
         // Once a connection has been made, make a subscription and send a message. 
         console.log("onConnect"); 
         client.subscribe("/topic/event"); // 訂閱消息的主題 
         var message = new Paho.MQTT.Message("Hello,this is a test"); 
         message.destinationName = "/topic/event"; 
         client.send(message); // 發送消息 
    } 
    // called when the client loses its connection 
    function onConnectionLost(responseObject) { // 連接丟失后的處理 
         if (responseObject.errorCode !== 0) { 
                 console.log("onConnectionLost:"+responseObject.errorMessage); 
         } 
    } 
    // called when a message arrives 
    function onMessageArrived(message) { // 消息接收成功后的處理 
         console.log("onMessageArrived:"+message.payloadString); 
    }
    
  2. java客戶端實現
    paho java客戶端目前只支持J2SE和安卓,提供源碼下載和maven庫。
    我們采用maven庫,其地址如下:
    Official Releases
    Nightly Snapshots
    說明:設定版本為1.0.0或0.9.0時,其jar包根本加載不進來,最后搜到1.0.1版本才可以正常使用。
    maven dependency配置:
    <dependency> 
        <groupId>org.eclipse.paho</groupId> 
        <artifactId>org.eclipse.paho.client.mqttv3</artifactId> 
        <version>1.0.1</version> 
    </dependency>
    
    java實現代碼:
        String topic        = "MQTT Examples"; 
        String content      = "Message from MqttPublishSample";
        int qos             = 2;
        String broker       = "tcp://127.0.0.1:61613";
        String clientId     = "JavaSample";
        MemoryPersistence persistence = new MemoryPersistence();
        try {
            MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
            MqttConnectOptions connOpts = new MqttConnectOptions();
            connOpts.setCleanSession(true);
            System.out.println("Connecting to broker: "+broker);
            sampleClient.connect(connOpts);
            System.out.println("Connected");
            System.out.println("Publishing message: "+content);
            MqttMessage message = new MqttMessage(content.getBytes());
            message.setQos(qos);
            sampleClient.publish(topic, message);
            System.out.println("Message published");
            sampleClient.disconnect();
            System.out.println("Disconnected");
            System.exit(0);
        } catch(MqttException me) {
            System.out.println("reason "+me.getReasonCode());
            System.out.println("msg "+me.getMessage());
            System.out.println("loc "+me.getLocalizedMessage());
            System.out.println("cause "+me.getCause());
            System.out.println("excep "+me);
            me.printStackTrace();
        }
    

 

小結

至此,MQTT協議已部署完畢,java端可以發布消息,而javascript端則可以訂閱並接收到java端發布的信息。
本文只是依照官網手冊而實現的簡單應用,講解不一定十分准確,有什么不對的地方還請多多指點,更詳細的應用請參考官網文檔:
apollo
eclipse paho


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM