php 利用activeMq+stomp實現消息隊列
一、activeMq概述
ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規范的 JMS Provider實現,盡管JMS規范出台已經是很久的事情了,但是JMS在當今的J2EE應用中間仍然扮演着特殊的地位。
二、特性列表
三、運行環境
在本地或者服務器中安裝activeMq,以Windows為例:
A、 windows下部署
ActiveMQ部署其實很簡單,和所有Java一樣,要跑java程序就必須先安裝JDK並配置好環境變量,這個很簡單。
然后解壓下載的apache-activemq-5.10-20140603.133406-78-bin.zip壓縮包到一個目錄,得到解壓后的目錄結構如下圖:
進入bin目錄,發現有win32和win64兩個文件夾,這2個文件夾分別對應windows32位和windows64位操作系統的啟動腳本。
我的實驗環境是windowsXP,就進入win32目錄,會看到如下目錄結構。
其中activemq.bat便是啟動腳本,雙擊啟動。
ActiveMQ默認啟動到8161端口,啟動完了后在瀏覽器地址欄輸入:http://localhost:8161/admin要求輸入用戶名密碼,默認用戶名密碼為admin、admin,這個用戶名密碼是在conf/users.properties中配置的。輸入用戶名密碼后便可看到如下圖的ActiveMQ控制台界面了。
四、生產消息
本文使用stomp協議實現mq隊列 ,項目中要加載stomp 包;
在需要使用隊列的地方,導入該包:
use App\Libraries\Stomp\Stomp;
將消息放入activeMq中:
protected function sendToMQ($destination, $msg_data, $persistent = false) { try { $con = new Stomp(config('app.mq_url')); $con->connect(); $con->begin("Transaction"); $con->send($destination, json_encode($msg_data), array('persistent'=> $persistent)); $con->commit("Transaction"); $con->disconnect(); } catch (\Exception $e) { app('log')->warn($e->getMessage()); } }
1.destination是指隊列名稱;
2.msg_data隊列中存放的數據;
3.persistent是否同步;
4.app.mq_url是指activeMq安裝的服務器地址及端口號,如tcp://localhost:61613;
經過段代碼之后,我們就將數據msg_data放到了隊列destination中了;
五、消費消息
在activeMq中的消息如何消費呢?一般情況下我們會建立一個cronjob來定時消費隊列的消息,消費隊列消息主要代碼如下:
try { $this->consumer = new Stomp($this->activemq_uri); //$this->acitvemq_uri就是上面的activeMq地址app.mq_url if (!$this->consumer->isConnected()) { $this->consumer->connect(); $this->consumer->setReadTimeout(3); } app('log')->info($this->log_remark . "connect to active mq success"); } catch (StompException $e) { app('log')->info("connect to active mq failed : " . $e->getMessage()); die(); } $queue = self::getQueue();//得到隊列名稱,上面定義的destination $this->consumer->subscribe($queue); //訂閱 //循環讀幀 while ($this->consumer->hasFrameToRead()) { try { $message = $this->consumer->readFrame(); $data = json_decode($message->body, true); // 這里其實就是得到了上面的隊列消息msg_data //驗證數據並更新數據 $handle_result = self::removeDuplicateExpressInfo($data['traces'], $data['shipping_id']); if ($handle_result) { $this->consumer->ack($message); } } catch (\Exception $e) { app('log')->info("handle with message failed : " . $e->getMessage()); if (!$this->consumer->isConnected()) { $this->consumer->connect(); } else { break; } } } $this->consumer->unsubscribe($queue); //釋放訂閱 $this->consumer->disconnect(); //端口連接