這段時間一直業務比較忙,因公司用的 databases 隊列,用起來 感覺不是很爽,故簡單封裝了一個rabbitmq類(業務代碼隨便寫的)
首先是賬號密碼配置
config.php
<?php return $arr = [ 'RabbitMq' => [ // Rabbitmq 服務地址 'host' => '127.0.0.1', // Rabbitmq 服務端口 'port' => '5672', // Rabbitmq 帳號 'login' => 'guest', // Rabbitmq 密碼 'password' => 'guest', 'vhost'=>'/' ] ];
基類 base.php
<?php include dirname(__FILE__).'/object.php'; include dirname(__FILE__).'/config.php'; class RabbitMq implements object { //保存類實例的靜態成員變量 static private $_instance; static private $_conn; static private $amp ; static private $route = 'key_1'; static private $q ; static private $ex ; static private $queue; public static function getInstance(){ global $arr; if (!(self::$_instance instanceof self)) { self::$_instance = new self($arr['RabbitMq']); return self::$_instance; } return self::$_instance; } private function __construct($conn) { //創建連接和channel $conn = new AMQPConnection($conn); if(!$conn->connect()) { die("Cannot connect to the broker!\n"); } self::$_conn = new AMQPChannel($conn); self::$amp = $conn; } /* * * * parm 交換機名 * parm 隊列名 * * */ public function listen($exchangeName,$queuename){ self::$queue = $queuename; return $this->setExchange($exchangeName,$queuename); } //連接交換機 public function setExchange($exchangeName,$queueName){ //創建交換機 $ex = new AMQPExchange(self::$_conn); self::$ex = $ex; $ex->setName($exchangeName); $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型 $ex->setFlags(AMQP_DURABLE); //持久化 $ex->declare(); return self::setQueue($queueName,$exchangeName); } //創建隊列 private static function setQueue($queueName,$exchangeName){ // 創建隊列 $q = new AMQPQueue(self::$_conn); $q->setName($queueName); $q->setFlags(AMQP_DURABLE); $q->declareQueue(); // 用於綁定隊列和交換機 $routingKey = self::$route; $q->bind($exchangeName, $routingKey); self::$q = $q; return(self::$_instance); } /* * 消費者 * $fun_name = array($classobj,$function) or function name string * $autoack 是否自動應答 * * function processMessage($envelope, $queue) { $msg = $envelope->getBody(); echo $msg."\n"; //處理消息 $queue->ack($envelope->getDeliveryTag());//手動應答 } */ public function run($func, $autoack = True){ if (!$func || !self::$q) return False; while(True){ if ($autoack) { if(!self::$q->consume($func, AMQP_AUTOACK)){ // self::$q->ack($envelope->getDeliveryTag()); //失敗之后會默認進入 noack 隊列。下次重新開啟會再次調用,目前還不清楚 回調配置應該這里做一個失敗反饋
//todu } } self::$q->consume($func); } } private static function closeConn(){ self::$amp->disconnect(); } public function pushlish($msg){ while (1) { sleep(1); if (self::$ex->publish(date('H:i:s') . "用戶" . "注冊", self::$route)) { //寫入文件等操作 echo $msg; } } } //__clone方法防止對象被復制克隆 public function __clone() { trigger_error('Clone is not allow!', E_USER_ERROR); } }
consume 監聽類(一個操作對應一個class)
<?php include dirname(__FILE__).'/base.php'; class Add { public static function run(){ $dbms='mysql'; //數據庫類型 $host='127.0.01'; //數據庫主機名 $dbName='test'; //使用的數據庫 $user='root'; //數據庫連接用戶名 $pass='admin'; //對應的密碼 $dsn="$dbms:host=$host;dbname=$dbName"; sleep(1); try { $dbh = new PDO($dsn, $user, $pass); //初始化一個PDO對象 /*你還可以進行一次搜索操作 foreach ($dbh->query('SELECT * from FOO') as $row) { print_r($row); //你可以用 echo($GLOBAL); 來看到這些值 } */ $dbh = null; } catch (PDOException $e) { die ("Error!: " . $e->getMessage() . "<br/>"); } //默認這個不是長連接,如果需要數據庫長連接,需要最后加一個參數:array(PDO::ATTR_PERSISTENT => true) 變成這樣: $db = new PDO($dsn, $user, $pass, array(PDO::ATTR_PERSISTENT => true)); $sql = 'INSERT INTO `test`.`t_reg`(`names`) VALUES (9)'; $row = $db->query($sql); if(!$row){ return false; } echo 'OK'; } } $consume = new Add(); //tudo //$s = RabbitMq::getInstance()->listen('jiaohuanji','queue1')->run(array($consume,'run')); 將run函數帶入到consume里面作為回調 在consume里面增加$funname ,增加代碼粘性 $s = RabbitMq::getInstance()->listen('jiaohuanji','queue1')->run(array($consume,'run'));
push 類(發送者)
<?php include "base.php"; RabbitMq::getInstance()->listen('jiaohuanji','queue1')->pushlish('請求已發送');
接口interface
<?php interface object { public static function getInstance(); }
監聽 add.php
執行 send.php 即可完成簡單的rabit操作