php 利用activeMq+stomp實現消息隊列


php 利用activeMq+stomp實現消息隊列

一、activeMq概述

  ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。

二、特性列表

⒈ 多種語言和協議編寫客戶端。語言: Java,C,C++,C#, Ruby, Perl, Python, PHP。應用協議: OpenWire,Stomp REST,WS Notification,XMPP, AMQP
⒉ 完全支持 JMS1.1和 J2EE 1.4規范 (持久化,XA消息, 事務)
⒊ 對Spring的支持,ActiveMQ可以很容易內嵌到使用Spring的系統里面去,而且也支持Spring2.0的特性
⒋ 通過了常見J2EE 服務器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的測試,其中通過JCA 1.5 resource adaptors的配置,可以讓ActiveMQ可以自動的部署到任何兼容J2EE 1.4 商業服務器上
⒌ 支持多種傳送協議:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
⒍ 支持通過JDBC和journal提供高速的消息持久化
⒎ 從設計上保證了高性能的集群,客戶端- 服務器,點對點
⒏ 支持 Ajax
⒐ 支持與Axis的整合
⒑ 可以很容易的調用內嵌JMS provider,進行測試

三、運行環境

  在本地或者服務器中安裝activeMq,以Windows為例:

A、 windows下部署

ActiveMQ部署其實很簡單,和所有Java一樣,要跑java程序就必須先安裝JDK並配置好環境變量,這個很簡單。

然后解壓下載的apache-activemq-5.10-20140603.133406-78-bin.zip壓縮包到一個目錄,得到解壓后的目錄結構如下圖:

進入bin目錄,發現有win32和win64兩個文件夾,這2個文件夾分別對應windows32位和windows64位操作系統的啟動腳本。

我的實驗環境是windowsXP,就進入win32目錄,會看到如下目錄結構。

其中activemq.bat便是啟動腳本,雙擊啟動。

ActiveMQ默認啟動到8161端口,啟動完了后在瀏覽器地址欄輸入:http://localhost:8161/admin要求輸入用戶名密碼,默認用戶名密碼為admin、admin,這個用戶名密碼是在conf/users.properties中配置的。輸入用戶名密碼后便可看到如下圖的ActiveMQ控制台界面了。

四、生產消息

  本文使用stomp協議實現mq隊列 ,項目中要加載stomp 包;  

  在需要使用隊列的地方,導入該包:

  

use App\Libraries\Stomp\Stomp;

  將消息放入activeMq中:

  

    protected function sendToMQ($destination, $msg_data, $persistent = false)
    {
        try {
            $con = new Stomp(config('app.mq_url'));
            $con->connect();
            $con->begin("Transaction");
            $con->send($destination, json_encode($msg_data), array('persistent'=> $persistent));
            $con->commit("Transaction");
            $con->disconnect();
        } catch (\Exception $e) {
            app('log')->warn($e->getMessage());
        }
    }

  1.destination是指隊列名稱;

  2.msg_data隊列中存放的數據;

  3.persistent是否同步; 

  4.app.mq_url是指activeMq安裝的服務器地址及端口號,如tcp://localhost:61613;

  經過段代碼之后,我們就將數據msg_data放到了隊列destination中了;

五、消費消息

  在activeMq中的消息如何消費呢?一般情況下我們會建立一個cronjob來定時消費隊列的消息,消費隊列消息主要代碼如下:

        try {
            $this->consumer = new Stomp($this->activemq_uri); //$this->acitvemq_uri就是上面的activeMq地址app.mq_url
            if (!$this->consumer->isConnected()) {
                $this->consumer->connect();
                $this->consumer->setReadTimeout(3);
            }
            app('log')->info($this->log_remark . "connect to active mq success");
        } catch (StompException $e) {
            app('log')->info("connect to active mq failed : " . $e->getMessage());
            die();
        }
        $queue = self::getQueue();//得到隊列名稱,上面定義的destination
        $this->consumer->subscribe($queue);  //訂閱
        //循環讀幀
        while ($this->consumer->hasFrameToRead()) {
            try {
                $message = $this->consumer->readFrame();
                $data = json_decode($message->body, true);  // 這里其實就是得到了上面的隊列消息msg_data
                //驗證數據並更新數據
                $handle_result = self::removeDuplicateExpressInfo($data['traces'], $data['shipping_id']);
                if ($handle_result) {
                    $this->consumer->ack($message);
                }
            } catch (\Exception $e) {
                app('log')->info("handle with message failed : " . $e->getMessage());
                if (!$this->consumer->isConnected()) {
                    $this->consumer->connect();
                } else {
                    break;
                }
            }
        }
        $this->consumer->unsubscribe($queue);    //釋放訂閱
        $this->consumer->disconnect();      //端口連接

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM