前言:
之前有寫過死信隊列的使用場景以及通過管控台創建死信。這次就通過代碼實現死信隊列的創建,同時也分享一下RabbitMQ封裝的類。
准備:
1. 先准備一個死信隊列(最后用來消費)的參數配置,包括虛擬機,交換機,隊列,有效時間等,如下。
2. 按照上面在RabbitMQ中創建虛擬機和交換機,死信隊列。並讓交換機與死信隊列綁定,操作方法前面有介紹。
3. 這里就直接提供rabbitMQ操作的基本封裝的類,包括一個基類,生產者類,消費者類。
3.1. 基類。
<?php namespace rabbitmq; /** Member * AMQPChannel * AMQPConnection * AMQPEnvelope * AMQPExchange * AMQPQueue * Class BaseMQ * @package rabbitMQ */ class BaseMQ { /** MQ Channel * @var \AMQPChannel */ public $AMQPChannel ; /** MQ Link * @var \AMQPConnection */ public $AMQPConnection ; /** MQ Envelope * @var \AMQPEnvelope */ public $AMQPEnvelope ; /** MQ Exchange * @var \AMQPExchange */ public $AMQPExchange ; /** MQ Queue * @var \AMQPQueue */ public $AMQPQueue ; /** conf * @var */ public $conf ; /** exchange * @var */ public $exchange ; /** * queue * @var */ public $queue; /** * routes * @var */ public $route; /** * queue_args * @var */ public $queueArgs; /** link * BaseMQ constructor. * @throws \AMQPConnectionException */ public function __construct($host,$options,$args = []) { $config = include 'config/config.php'; if (!$config) throw new \AMQPConnectionException('config error!'); $this->host = array_merge($config,$host); isset($options['vhost']) && $this->host['vhost'] = $options['vhost']; $this->exchange = $options['exchange']; $this->queue = $options['queue']; $this->route = $options['route']; $this->queueArgs = $args; $this->AMQPConnection = new \AMQPConnection($this->host); if (!$this->AMQPConnection->connect()) throw new \AMQPConnectionException("Cannot connect to the broker!\n"); } /** * close link */ public function close() { $this->AMQPConnection->disconnect(); } /** Channel * @return \AMQPChannel * @throws \AMQPConnectionException */ public function channel() { if (!$this->AMQPChannel) { $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection); } return $this->AMQPChannel; } /** Exchange * @return \AMQPExchange * @throws \AMQPConnectionException * @throws \AMQPExchangeException */ public function exchange() { if (!$this->AMQPExchange) { $this->AMQPExchange = new \AMQPExchange($this->channel()); $this->AMQPExchange->setName($this->exchange); } return $this->AMQPExchange ; } /** queue * @return \AMQPQueue * @throws \AMQPConnectionException * @throws \AMQPQueueException */ public function queue() { if (!$this->AMQPQueue) { $this->AMQPQueue = new \AMQPQueue($this->channel()); } return $this->AMQPQueue ; } /** Envelope * @return \AMQPEnvelope */ public function envelope() { if (!$this->AMQPEnvelope) { $this->AMQPEnvelope = new \AMQPEnvelope(); } return $this->AMQPEnvelope; } }
3.2. 生產者類。
<?php //生產 namespace rabbitmq; class ProductMQ extends BaseMQ { /** 只控制發送成功 不接受消費者是否收到 * @throws \AMQPChannelException * @throws \AMQPConnectionException * @throws \AMQPExchangeException */ public function publish($message) { $message = is_array($message)?json_encode($message):$message; //頻道 $channel = $this->channel(); //創建交換機對象 $ex = $this->exchange(); return $ex->publish($message, $this->route, AMQP_NOPARAM, array('delivery_mode' => 2)); } }
3.3. 消費者。
<?php namespace rabbitmq; class ConsumerMQ extends BaseMQ { /** 接受消息 如果終止 重連時會有消息 * @throws \AMQPChannelException * @throws \AMQPConnectionException * @throws \AMQPExchangeException * @throws \AMQPQueueException */ public function run($processMessage) { // 創建交換機 $ex = $this->exchange(); // direct類型 $ex->setType(AMQP_EX_TYPE_DIRECT); // 持久化 $ex->setFlags(AMQP_DURABLE); // 不存在就發布 $ex->declareExchange(); // 創建隊列 $q = $this->queue(); // 設置隊列名稱 $q->setName($this->queue); // 持久化 $q->setFlags(AMQP_DURABLE); // 隊列參數 is_array($this->queueArgs) && $q->setArguments($this->queueArgs); //echo "Message Total:".$q->declareQueue()."\n"; $q->declareQueue(); //綁定交換機與隊列,並指定路由鍵 // echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."\n"; $q->bind($this->exchange, $this->route); //阻塞模式接收消息 // echo "Message:\n"; if (!is_null($processMessage)) { while (True) { $q->consume($processMessage); } } $this->close(); } }
編碼:
上面的死信隊列已經創建好了,接下來主要就是通過代碼創建一個用於直接生產消息的普通隊列,但是這個隊列需要設置三個參數。
x-dead-letter-exchange: 關聯死信的交換機
x-dead-letter-routing-key 關聯死信的路由key
x-message-ttl 當前隊列消息的有效期,也就是多久后消息自動進行死信隊列,並且從本隊列刪除
1. 代碼部分:
public function addToDlx()
{
$host = [
'host' => '127.0.0.1', 'port' => '5672', 'login' => 'guest', 'password' => 'guest', 'vhost' => 'report', 'heartbeat' => 60 ]; // 普通隊列 $normal = [ 'vhost' => 'report', // 虛擬機 'exchange' => 'normal', // 交換機 'route' => 'normal_route', // 路由key - 用於交換機與隊列進行綁定 'queue' => 'normal_queue', // 隊列 'expire' => 1000*60, // 有效時間單位:毫秒 - 1分鍾 ]; // 死信隊列 $normal_dlx = [ 'vhost' => 'report', 'exchange' => 'normal_dlx', 'route' => 'normal_dlx_route', 'queue' => 'normal_dlx_queue' ]; // 給普通隊列關聯死信隊列,攜帶的參數 $dlx_args = [ 'x-dead-letter-exchange' => $normal_dlx['exchange'], 'x-dead-letter-routing-key' => $normal_dlx['route'], 'x-message-ttl' => $normal['expire'], ]; //////////////// 通過消費者方式創建死信隊列///////////// $dlx_mq = new ConsumerMQ($host,$normal,$dlx_args); $dlx_mq->run(null); //////////////////////////////////////////////////////// //////////////// 將消息放入普通隊列///////////////////// $mq = new ProductMQ($host, $normal); $param = json_encode([ 'name' => 'test', 'id' => 11568, 'remark' => '測試一下' ]); $mq->publish($param); $mq->close(); //////////////////////////////////////////////////////// }
2. 測試結果:
通過postman點擊上面接口,控制台就可以看出多出了一個normal隊列,並且隊列的 Features 為“ D TTL DLX DLK ”,$param的消息也會首先進入“normal”隊列。
2. 1分鍾后(自己設置的),normal的消息會失效,進而開始添加到了死信隊列“normal_dxl”,可以點擊死信查看最新的消息信息。