封裝php的RabbitMq


這段時間一直業務比較忙,因公司用的 databases 隊列,用起來 感覺不是很爽,故簡單封裝了一個rabbitmq類(業務代碼隨便寫的)

首先是賬號密碼配置

config.php

<?php
    return $arr = [
        'RabbitMq' => [
            // Rabbitmq 服務地址
            'host' => '127.0.0.1',
            // Rabbitmq 服務端口
            'port' => '5672',
            // Rabbitmq 帳號
            'login' => 'guest',
            // Rabbitmq 密碼
            'password' => 'guest',
            'vhost'=>'/'
        ]
    ];

基類 base.php 

<?php
include dirname(__FILE__).'/object.php';
include dirname(__FILE__).'/config.php';
class RabbitMq implements object
{
    //保存類實例的靜態成員變量
    static private $_instance;
    static private $_conn;
    static private $amp ;
    static private $route = 'key_1';
    static private $q ;
    static private $ex ;
    static private $queue;
    public static function getInstance(){
        global $arr;
        if (!(self::$_instance instanceof self)) {
            self::$_instance = new self($arr['RabbitMq']);
            return self::$_instance;
        }
        return self::$_instance;
    }

    private function __construct($conn)
    {
        //創建連接和channel
        $conn = new AMQPConnection($conn);
        if(!$conn->connect()) {
            die("Cannot connect to the broker!\n");
        }
        self::$_conn = new AMQPChannel($conn);
        self::$amp = $conn;
    }
    /* *
     *
     * parm 交換機名
     * parm 隊列名
     *
     * */
    public function listen($exchangeName,$queuename){
        self::$queue = $queuename;
        return $this->setExchange($exchangeName,$queuename);
    }

    //連接交換機
    public function setExchange($exchangeName,$queueName){
        //創建交換機
        $ex = new AMQPExchange(self::$_conn);
        self::$ex = $ex;
        $ex->setName($exchangeName);

        $ex->setType(AMQP_EX_TYPE_DIRECT); //direct類型
        $ex->setFlags(AMQP_DURABLE); //持久化
        $ex->declare();
        return self::setQueue($queueName,$exchangeName);
    }

    //創建隊列
    private static function setQueue($queueName,$exchangeName){
    //  創建隊列
        $q = new AMQPQueue(self::$_conn);
        $q->setName($queueName);
        $q->setFlags(AMQP_DURABLE);
        $q->declareQueue();

    // 用於綁定隊列和交換機
        $routingKey = self::$route;
        $q->bind($exchangeName,  $routingKey);
        self::$q = $q;
        return(self::$_instance);
    }

    /*
 * 消費者
 * $fun_name = array($classobj,$function) or function name string
 * $autoack 是否自動應答
 *
 * function processMessage($envelope, $queue) {
        $msg = $envelope->getBody();
        echo $msg."\n"; //處理消息
        $queue->ack($envelope->getDeliveryTag());//手動應答
    }
 */
    public function run($func, $autoack = True){
        if (!$func || !self::$q) return False;
        while(True){
            if ($autoack) {
                if(!self::$q->consume($func, AMQP_AUTOACK)){
//                    self::$q->ack($envelope->getDeliveryTag());
                   //失敗之后會默認進入 noack 隊列。下次重新開啟會再次調用,目前還不清楚 回調配置應該這里做一個失敗反饋
             //todu } } self::$q->consume($func); } } private static function closeConn(){ self::$amp->disconnect(); } public function pushlish($msg){ while (1) { sleep(1); if (self::$ex->publish(date('H:i:s') . "用戶" . "注冊", self::$route)) { //寫入文件等操作 echo $msg; } } } //__clone方法防止對象被復制克隆 public function __clone() { trigger_error('Clone is not allow!', E_USER_ERROR); } }
consume 監聽類(一個操作對應一個class)
<?php
include dirname(__FILE__).'/base.php';
class Add
{
    public static function run(){

        $dbms='mysql';     //數據庫類型
        $host='127.0.01'; //數據庫主機名
        $dbName='test';    //使用的數據庫
        $user='root';      //數據庫連接用戶名
        $pass='admin';          //對應的密碼
        $dsn="$dbms:host=$host;dbname=$dbName";

        sleep(1);
        try {
            $dbh = new PDO($dsn, $user, $pass); //初始化一個PDO對象
            /*你還可以進行一次搜索操作
            foreach ($dbh->query('SELECT * from FOO') as $row) {
                print_r($row); //你可以用 echo($GLOBAL); 來看到這些值
            }
            */
            $dbh = null;
        } catch (PDOException $e) {
            die ("Error!: " . $e->getMessage() . "<br/>");
        }
//默認這個不是長連接,如果需要數據庫長連接,需要最后加一個參數:array(PDO::ATTR_PERSISTENT => true) 變成這樣:
        $db = new PDO($dsn, $user, $pass, array(PDO::ATTR_PERSISTENT => true));
        $sql = 'INSERT INTO `test`.`t_reg`(`names`) VALUES (9)';
        $row = $db->query($sql);
        if(!$row){
           return false;
        }
            echo 'OK';
    }

}
$consume = new Add();
//tudo
//$s = RabbitMq::getInstance()->listen('jiaohuanji','queue1')->run(array($consume,'run')); 將run函數帶入到consume里面作為回調 在consume里面增加$funname ,增加代碼粘性
$s = RabbitMq::getInstance()->listen('jiaohuanji','queue1')->run(array($consume,'run'));

push 類(發送者)

<?php
include "base.php";

RabbitMq::getInstance()->listen('jiaohuanji','queue1')->pushlish('請求已發送');

接口interface 

<?php
    interface object
    {
        public static function getInstance();
    }

監聽 add.php

執行 send.php 即可完成簡單的rabit操作





免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM