<?php
final class RabbitMQ{
//服務器地址
private $_host;
//服務端口
private $_port;
//路由名稱
private $_rout;
//用戶名
private $_login;
//密碼
private $_password;
//虛擬機名稱
private $_vhost ;
//交換機名稱
private $_exchangename;
//交換機flags
private $_flags;
//隊列名稱
private $_queue;
//當前類對象
private static $_obj;
//MQ服務對象
private $_MQ;
//MQ通道對象
private $_channel;
//MQ路由對象
private $_exchange;
//MQ隊列對象
private $_queueobj;
private $_queueFlags;
private function __construct(){
}
private function __clone(){
}
/**
* 配置文件設置
* @param array
*/
public function config($param){
$this->_host = $param["host"];
$this->_port = $param["port"];
$this->_login = $param["user_name"];
$this->_password = $param["password"];
$this->_queue = $param["queue_name"];
$this->_route_key = $param["route_key"];
$this->_exchangename = $param["exchangename"];
$this->_vhost = $param["vhost"];
$this->_exchangeType = $param["exchangeType"];
$this->_exchangeFlags = $param["flags"];
$this->_queueFlags = $param["flags"];
}
/**
* 獲取當前類對象實現單例
*/
public static function init(){
if(!self::$_obj instanceof self){
self::$_obj = new self;
}
return self::$_obj;
}
public function connect(){
$config = array(
"host" => $this->_host,
"port" => $this->_port,
"login" => $this->_login,
"password" => $this->_password,
"vhost" => $this->_vhost,
);
//創建服務器鏈接對象
$this->_MQ = new AMQPConnection($config);
if (!$this->_MQ->connect()) {
throw new Exception("鏈接MQ服務失敗", 400);
}
//創建通道鏈接對象
$this->_channel = new AMQPChannel($this->_MQ);
//創建路由對象
$this->_exchange = new AMQPExchange($this->_channel);
//設置交換機名稱
if(!empty($this->_exchangename))
$this->_exchange->setName($this->_exchangename);
//設置交換機類型
if(!empty($this->_exchangeType))
$this->_exchange->setType($this->_exchangeType);
//設置交換機flags
if(!empty($this->_exchangeFlags))
$this->_exchange->setFlags($this->_exchangeFlags);
//創建交換機
$this->_exchange->declareExchange();
//創建隊列對象
$this->_queueobj = new AMQPQueue($this->_channel);
//設置隊列名稱
$this->_queueobj ->setName($this->_queue);
//設置隊列flags;
$this->_queueobj->setFlags($this->_queueFlags);
//創建隊列
$this->_queueobj ->declareQueue();
//將隊列和交換機綁定道路由key
$this->_queueobj ->bind($this->_exchangename,$this->_route_key);
}
/**
* 發布消息
*/
public function publish($content){
$this->_exchange->publish($content,$this->_route_key);
}
/**
* 獲取消息
*/
public function getMsg(){
$info = $this->_queueobj->get(AMQP_AUTOACK)->getBody();
return $info;
}
}
##########################
######### 類調用 #########
##########################
$config = array(
"host" => "10.100.13.142",
"port" => "5672",
"user_name" => "gedai",
"password" => "*****",
"queue_name" => "contract_request",
"route_key" => "contract_request",
"vhost" => "/credithc",
"exchangeType" => AMQP_EX_TYPE_DIRECT,
"flags" => AMQP_DURABLE,
"exchangename" => "CREDITHC_CS"
);
header("Content-type:text/html;charset=utf-8");
include_once("RabbitMQ.class.php");
try{
$MQ = RabbitMQ :: init();
$MQ -> config($config);
$MQ -> connect();
$MQ -> publish("test");
$ret = $MQ -> getMsg();
print_r($ret);
}catch(Exception $e){
var_dump($e);
}