前言:
有些人為了讓項目快速上線,服務器往往安裝寶塔面板,然后再極速安裝LNMP。盡管環境搭建的時間省了,但是寶塔上PHP中擴展包沒有提供AMQP。這時候只是為了使用消息隊列而對PHP大動干戈, 不如使用一個PHP AMQP的庫,即用即裝,不對環境造成影響。
簡介:
php-amqplib 客戶端庫,通過composer安裝,不需要在PHP中安裝擴展,以下為兩種不同的安裝方式。
1. 項目中新建composer.json,添加如下代碼,然后composer install
{
"require": { "php-amqplib/php-amqplib": " 2.6.*" } }
2. 命令進入到項目,然后 composer require php-amqplib/php-amqplib 2.6.*
RabbitMQ設置:
1. 進入web管控台,添加新用戶,角色管理員,任何IP上都可以登錄,授權指定虛擬機。
2. 添加交換機
3. 添加隊列並與交互機綁定。
編碼:
1. 封裝rabbitMQ類。
<?php use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; /** * Class RabbitMQ. */ class RabbitMQ { const READ_LINE_NUMBER = 0; const READ_LENGTH = 1; const READ_DATA = 2; public $config; public static $prefix = 'autoinc_key:'; protected $exchangeName = 'flow'; protected $queueName = 'flow_queue'; /** * @var \PhpAmqpLib\Connection\AMQPStreamConnection */ protected $connection; /** * @var \PhpAmqpLib\Channel\AMQPChannel */ protected $channel; protected $queue; //配置項 private $host; private $port; private $user; private $pass; private $vhost; public function __construct($config = []) { //$this->config = $config; //設置rabbitmq配置值 $this->host = '192.168.1.101'; $this->port = 5672; $this->user = 'beiqiaosu'; $this->pass = 'beiqiaosu'; $this->vhost = 'report'; $this->connect(); } public function __call($method, $args = []) { $reConnect = false; while (1) { try { $this->initChannel(); $result = call_user_func_array([$this->channel, $method], $args); } catch (\Exception $e) { //已重連過,仍然報錯 if ($reConnect) { throw $e; } \Swoole::$php->log->error(__CLASS__ . ' [' . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ', Code=' . $e->getCode() . "), RabbitMQ->{$method}, Params=" . var_export($args, 1)); if ($this->connection) { $this->close(); } $this->connect(); $reConnect = true; continue; } return $result; } //不可能到這里 return false; } /** * 連接rabbitmq消息隊列. * * @return bool */ public function connect() { try { if ($this->connection) { unset($this->connection); } $this->connection = new AMQPStreamConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost); } catch (\Exception $e) { echo __CLASS__ ."Swoole RabbitMQ Exception'".$e->getMessage(); return false; } } /** * 關閉連接. */ public function close() { $this->channel->close(); $this->connection->close(); } /** * 設置交換機名稱. * * @param string $exchangeName */ public function setExchangeName($exchangeName = '') { $exchangeName && $this->exchangeName = $exchangeName; } /** * 設置隊列名稱. * * @param string $queueName */ public function setQueueName($queueName = '') { $queueName && $this->queueName = $queueName; } /** * 設置頻道. */ public function initChannel() { if (!$this->channel) { //通道 $this->channel = $this->connection->channel(); $this->channel->queue_declare($this->queueName, false, true, false, false); $this->channel->exchange_declare($this->exchangeName, 'direct', false, true, false); $this->channel->queue_bind($this->queueName, $this->exchangeName); } } /** * 獲取隊列數據. * * @return mixed */ public function pop() { while (1) { try { $this->connect(); $this->initChannel(); $message = $this->channel->basic_get($this->queueName); if ($message) { $this->channel->basic_ack($message->delivery_info['delivery_tag']); $result = $message->body; } else { throw new \Exception('Empty Queue Data'); } } catch (\Exception $e) { //\Swoole::$php->log->error(__CLASS__ . " [" . posix_getpid() . "] Swoole RabbitMQ[{$this->config['host']}:{$this->config['port']}] Exception(Msg=" . $e->getMessage() . ", Code=" . $e->getCode() . ")"); sleep(1); continue; } return $result; } //不可能到這里 return false; } /** * 插入隊列數據. * * @param $data * * @return bool */ public function push($data) { while (1) { try { $this->connect(); $this->initChannel(); $message = new AMQPMessage($data, ['content_type'=>'text/plain', 'devlivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); $this->channel->basic_publish($message, $this->exchangeName); } catch (\Exception $e) { echo "$e->getMessage()"; continue; } return true; } //不可能到這里 return false; } }
2. 操作mq,出隊,入隊。
<?php require_once "vendor/autoload.php"; require_once "component/RabbitMQ.php"; $mq = new RabbitMQ(); // 消息消費測試 /*try { $res = $mq->pop(); }catch(\Exception $e) { var_dump($e->getMessage());die; }*/ // 消息生產測試 try { $res = $mq->push(json_encode(['name'=>'beiqiaosu','order_id'=>'2020070115261425155'])); }catch(\Exception $e) { var_dump($e->getMessage());die; } var_dump($res);die;
測試:
1. 先通過生產消息(入隊)方法運行一下,然后進入隊列中get message查看消息總數。
2. 測試調用消費,再查看總數。
關注公眾號,回復 “寶塔MQ” 獲取demo源碼。