環境:CoentOS,PHP 7
簡單介紹一下php-amqplib
php-amqplib是Advanced Message Queuing Protocol (AMQP)的一個PHP開源實現。高級消息隊列協議(AMQP)是一個異步消息傳遞所使用的應用層協議規范。作為線路層協議,而不是API(例如JMS),AMQP 客戶端能夠無視消息的來源任意發送和接受信息
1、RabbitMQ的安裝
需要下載的兩個包
erlang-21.0.7-1.el7.centos.x86_64.rpm
rabbitmq-server-3.7.7-1.el7.noarch.rpm
這兩個包我已經放在了百度雲盤的分享上
鏈接:https://pan.baidu.com/s/1rMv_yFpLnH-D1S5wrOZrbA#list/path=%2FRabbitMQ
提取碼:ipyu
然后參照 weixin_41368339的博客linux rabbitmq3.7.7安裝與使用一文中的步驟安裝步,基本上沒有什么問題
2、composer的安裝(已安裝的請忽略此步)
為什么要裝這個?我們可以通過composer來下載安裝php-amqplib
如何安裝composer,可以百度一下composer的全局安裝或者直接去composer中文網
3、php-amqplib的下載及安裝
新建一個composer.json的文件,內容如下所示
-
{
-
"require": {
-
"php-amqplib/php-amqplib": ">=2.6.1"
-
}
-
}
然后執行
composer install
會生成一個composer.lock文件及vendor文件夾,vendor文件夾里有php-amqplib庫,且有一個autoload.php文件可以使用自動加載
4、Demo示例
本Demo示例只創建了一個直連交換機,共有四個文件Consumer.php (消費者),Publisher.php (生產者) ,Parenter.php (自己封裝的RabbitMQ的方法) ,以及test.php (測試數據),目錄如圖所示
Parenter.php 代碼如下圖所示
-
-
require_once __DIR__ . '/vendor/autoload.php';
-
-
use PhpAmqpLib\ Connection\ AMQPStreamConnection;
-
use PhpAmqpLib\ Message\ AMQPMessage;
-
abstract class Parenter
-
{
-
//MQ的默認連接配置
-
public $config = array(
-
'host' => '127.0.0.1', //ip
-
'port' => '5672', //端口號
-
'user' => 'guest', //用戶
-
'password' => 'guest', //密碼
-
'vhost' => '/' //虛擬host
-
);
-
-
public $connection; //鏈接
-
public $channel; //信道
-
-
public $exchangeName = ''; //交換機名
-
public $queueName = ''; //隊列名
-
public $routeKey = ''; //路由鍵
-
public $exchangeType = 'direct'; //交換機類型
-
-
public $autoAck = true; //是否自動ack應答
-
-
public function __construct($exchangeName, $queueName, $routeKey, $exchangeType = 'direct', $config=array())
-
{
-
$this->exchangeName = empty($exchangeName) ? '' : $exchangeName;
-
$this->queueName = empty($queueName) ? '' : $queueName;
-
$this->routeKey = empty($routeKey) ? '' : $routeKey;
-
$this->exchangeType = empty($exchangeType) ? '' : 'direct';
-
if(! empty($config))
-
{
-
$this->setConfig($config);
-
}
-
$this->createConnect();
-
}
-
-
//創建連接與信道
-
private function createConnect()
-
{
-
$host = $this->config[ 'host'];
-
$port = $this->config[ 'port'];
-
$user = $this->config[ 'user'];
-
$password = $this->config[ 'password'];
-
$vhost = $this->config[ 'vhost'];
-
if( empty($host) || empty($port) || empty($user) || empty($password))
-
{
-
throw new Exception( 'RabbitMQ的連接配置不正確');
-
}
-
//創建鏈接
-
$this->connection = new AMQPStreamConnection($host, $port, $user, $password, $vhost);
-
//創建信道
-
$this->channel = $this->connection->channel();
-
$this->createExchange();
-
}
-
-
//創建交換機
-
private function createExchange()
-
{
-
//創建交換機$channel->exchange_declare($exhcange_name,$type,$passive,$durable,$auto_delete);
-
//passive: 消極處理, 判斷是否存在隊列,存在則返回,不存在直接拋出 PhpAmqpLib\Exception\AMQPProtocolChannelException 異常
-
//durable:true、false true:服務器重啟會保留下來Exchange。警告:僅設置此選項,不代表消息持久化。即不保證重啟后消息還在
-
//autoDelete:true、false.true:當已經沒有消費者時,服務器是否可以刪除該Exchange
-
$this->channel->exchange_declare( $this->exchangeName, $this->exchangeType, false, true, false);
-
//passive: 消極處理, 判斷是否存在隊列,存在則返回,不存在直接拋出 PhpAmqpLib\Exception\AMQPProtocolChannelException 異常
-
//durable:true、false true:在服務器重啟時,能夠存活
-
//exclusive :是否為當前連接的專用隊列,在連接斷開后,會自動刪除該隊列
-
//autodelete:當沒有任何消費者使用時,自動刪除該隊列
-
//arguments: 自定義規則
-
$this->channel->queue_declare( $this->queueName, false, true, false, false);
-
}
-
-
//發送消息
-
public function sendMessage($data)
-
{
-
//創建消息$msg = new AMQPMessage($data,$properties)
-
//#$data string類型 要發送的消息
-
//#roperties array類型 設置的屬性,比如設置該消息持久化[‘delivery_mode’=>2]
-
$msg = new AMQPMessage($data, array( 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
-
$this->channel->basic_publish($msg, $this->exchangeName, $this->routeKey);
-
}
-
-
//處理消息
-
public function dealMq($flag)
-
{
-
$this->autoAck = $flag;
-
$this->channel->queue_bind( $this->queueName, $this->exchangeName, $this->routeKey);
-
//prefetchSize:0
-
//prefetchCount:會告訴RabbitMQ不要同時給一個消費者推送多於N個消息,即一旦有N個消息還沒有ack,則該consumer將block掉,直到有消息ack
-
//global:true\false 是否將上面設置應用於channel,簡單點說,就是上面限制是channel級別的還是consumer級別
-
//$this->channel->basic_qos(0, 1, false);
-
//1:queue 要取得消息的隊列名
-
//2:consumer_tag 消費者標簽
-
//3:no_local false這個功能屬於AMQP的標准,但是rabbitMQ並沒有做實現.參考
-
//4:no_ack false收到消息后,是否不需要回復確認即被認為被消費
-
//5:exclusive false排他消費者,即這個隊列只能由一個消費者消費.適用於任務不允許進行並發處理的情況下.比如系統對接
-
//6:nowait false不返回執行結果,但是如果排他開啟的話,則必須需要等待結果的,如果兩個一起開就會報錯
-
//7:callback null回調函數
-
//8:ticket null
-
//9:arguments null
-
$this->channel->basic_consume( $this->queueName, '', false, $this->autoAck, false, false, function($msg){ $this->get($msg);});
-
//監聽消息
-
while(count( $this->channel->callbacks)){
-
$this->channel->wait();
-
}
-
}
-
-
public function get($msg)
-
{
-
$param = $msg->body;
-
$this->doProcess($param);
-
if(! $this->autoAck)
-
{
-
//手動ack應答
-
$msg->delivery_info[ 'channel']->basic_ack($msg->delivery_info[ 'delivery_tag']);
-
}
-
}
-
-
abstract public function doProcess($param);
-
-
public function closeConnetct()
-
{
-
$this->channel->close();
-
$this->connection->close();
-
}
-
-
//重新設置MQ的鏈接配置
-
public function setConfig($config)
-
{
-
if (!is_array($config))
-
{
-
throw new Exception( 'config不是一個數組');
-
}
-
foreach($config as $key => $value)
-
{
-
$this->config[$key] = $value;
-
}
-
}
-
}
Consumer.php (消費者)
-
-
include_once( 'Parenter.php');
-
class Consumer extends Parenter
-
{
-
public function __construct()
-
{
-
parent::__construct( 'exchange', 'queue', 'routeKey');
-
}
-
public function doProcess($msg)
-
{
-
echo $msg. "\n";
-
}
-
}
-
$consumer = new Consumer();
-
//$consumer->dealMq(false);
-
$consumer->dealMq( true);
Publisher.php (生產者)
-
-
include_once( 'Parenter.php');
-
class Publisher extends Parenter
-
{
-
public function __construct()
-
{
-
parent::__construct( 'exchange', '', 'routeKey');
-
}
-
public function doProcess($msg)
-
{
-
-
}
-
-
}
test.php(測試數據)
-
-
include_once( 'Publisher.php');
-
$publisher = new Publisher();
-
$publisher->sendMessage( 'Hello,World!');
-
$publisher->closeConnetct();
5、添加交換機與隊列
打開http://ip(你的RabbitMQ安裝的主機):15672/,會進入到RabbitMQ的可視化管理后台登錄頁面,登錄你的賬號密碼(如果你是按照第一步提到的博客里的教程來裝的,那你的賬號密碼就是guest),然后新加交換機和隊列,
以下是新加交換機的操作,注意vhost與以及交換機的名稱要與代碼里的消費者與生產者傳入的參數值保持一致,如果你不想使用"/"這個默認的vhost,也可以新建一個vhost(什么?你問我如何新建,那么請百度一下),但是要記住在代碼里創建消費者與生產者時把你新加的這個vhost傳進去,覆蓋RabbitMqParernt.php里的vhost
以下是新加隊列,這里的vhost要與上一步的vhost保持一致,保證交換機與隊列在同一個vhost下,不然交換機會找不到隊列的,隊列名與消費者代碼里傳入進去的隊列名保持一致
6、運行代碼
先打開一個窗口啟動消費者
運行測試腳本
如果打印出來字符串就成功了
注意:消費者與生產者傳入的交換機名稱,路由鍵必須相同
交換機類型請務必選擇直連,各種交換機的路由鍵形式不大相同,有興趣的同學可以去試試其它類型的交換機實現哦
當修改了vhost或者交換機名稱,隊列名稱等時,需要修改對應代碼
至於注釋里的ack應答,我會在之后的博客里詳細介紹,包括RabbitMQ的持久化,這里使用默認的ack應答即可
代碼里很多注釋都是我后來學習php-amqplib庫中類的方法時加的,表示的是參數的意義,大家也可以去研究一下,這里提供個網址: Rabbitmq各方法的作用詳解
關於管理后台及RabbitMQ的命令,我這里就不多介紹了,有興趣的同學去網上搜索一下就能搜到好多
下一篇:RabbitMQ的持久化(六)