【RabbitMQ 實戰指南】一 延遲隊列


1、什么是延遲隊列

延遲隊列中存儲延遲消息,延遲消息是指當消息被發送到隊列中不會立即消費,而是等待一段時間后再消費該消息。

延遲隊列很多應用場景,一個典型的應用場景是訂單未支付超時取消,用戶下單之后30分鍾內未支付成功,則把訂單取消。

2、使用要求

RabbitMQ 本身沒有直接支持延遲隊列的功能,但是可以通過過期時間TTL和死信隊列來模擬延遲隊列。

過期時間TTL 可以參考文章: 【RabbitMQ 實戰指南】一 過期時間TTL

死信隊列可以參考文章:【RabbitMQ 實戰指南】一 死信隊列

3、延遲隊列測試

采用訂單未支付超時取消的應用場景來做測試,其具體步驟如下:

  • 1、創建兩個交換器 exchange.order 和 exchange.delay, 分別綁定兩個隊列 queue.order 和 queue.delay

  • 2、把 queue.delay 隊列里面的消息配置過期時間,一般訂單是30分鍾,這里設置成10秒,然后通過 x-dead-letter-exchange 指定死信交換器為 exchange.delay

  • 3、發送消息到 queue.order 中,消息過期之后流入 exchange.delay,然后路由到 queue.delay 隊列中,然后檢查訂單狀態,如果未支付,則進行取消操作

3.1、生產者代碼

<?php 
require __DIR__ . '/../../../../vendor/autoload.php';

use PhpAmqpLib\Wire\AMQPTable;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection;

// todo 更改配置
$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');

$channel = $connection->channel();

$channel->exchange_declare('exchange.order', AMQPExchangeType::DIRECT, false, true);
$channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true);
$args = new AMQPTable();
// 消息過期方式:設置 queue.order 隊列中的消息10s之后過期
$args->set('x-message-ttl', 10000);
$args->set('x-dead-letter-exchange', 'exchange.delay');
$args->set('x-dead-letter-routing-key', 'routingkey.delay');
$channel->queue_declare('queue.order', false, true, false, false, false, $args);
$channel->queue_declare('queue.delay', false, true, false, false);

$channel->queue_bind('queue.order', 'exchange.order', 'routingkey.cancel.order');
$channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay');
$message = new AMQPMessage('F20190413180108970');
$channel->basic_publish($message, 'exchange.order', 'routingkey.cancel.order'); $channel->close(); $connection->close();

運行生產者代碼之后,queue.order 隊列會有一條消息,如下圖:

10秒之后,消息會過期,然后被進入 exchange.delay, 進而路由到 queue.delay 隊列中:

3.2、消費者代碼

<?php 
require __DIR__ . '/../../../../vendor/autoload.php';

use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Connection\AMQPStreamConnection;

// todo 更改配置
$connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/');
$channel = $connection->channel();

$channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true);
$channel->queue_declare('queue.delay', false, true, false, false);

$channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay');

function process_message($message)
{
    echo "開始處理訂單,訂單號:" . $message->body . PHP_EOL;
    echo "獲取訂單的狀態,如果未支付,則進行取消訂單操作" . PHP_EOL;
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
}

$channel->basic_consume('queue.delay', 'cancelOrder', false, false, false, false, 'process_message');

function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);

while ($channel ->is_consuming()) {
    $channel->wait();
}

運行消費者代碼之后,會獲取到訂單號,之后可以檢查該訂單的狀態,如果未支付則進行取消操作,如下圖:

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM