1、什么是延遲隊列
延遲隊列中存儲延遲消息,延遲消息是指當消息被發送到隊列中不會立即消費,而是等待一段時間后再消費該消息。
延遲隊列很多應用場景,一個典型的應用場景是訂單未支付超時取消,用戶下單之后30分鍾內未支付成功,則把訂單取消。
2、使用要求
RabbitMQ 本身沒有直接支持延遲隊列的功能,但是可以通過過期時間TTL和死信隊列來模擬延遲隊列。
過期時間TTL 可以參考文章: 【RabbitMQ 實戰指南】一 過期時間TTL
死信隊列可以參考文章:【RabbitMQ 實戰指南】一 死信隊列
3、延遲隊列測試
采用訂單未支付超時取消的應用場景來做測試,其具體步驟如下:
-
1、創建兩個交換器 exchange.order 和 exchange.delay, 分別綁定兩個隊列 queue.order 和 queue.delay
-
2、把 queue.delay 隊列里面的消息配置過期時間,一般訂單是30分鍾,這里設置成10秒,然后通過 x-dead-letter-exchange 指定死信交換器為 exchange.delay
-
3、發送消息到 queue.order 中,消息過期之后流入 exchange.delay,然后路由到 queue.delay 隊列中,然后檢查訂單狀態,如果未支付,則進行取消操作
3.1、生產者代碼
<?php require __DIR__ . '/../../../../vendor/autoload.php'; use PhpAmqpLib\Wire\AMQPTable; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Connection\AMQPStreamConnection; // todo 更改配置 $connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/'); $channel = $connection->channel(); $channel->exchange_declare('exchange.order', AMQPExchangeType::DIRECT, false, true); $channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true); $args = new AMQPTable(); // 消息過期方式:設置 queue.order 隊列中的消息10s之后過期 $args->set('x-message-ttl', 10000); $args->set('x-dead-letter-exchange', 'exchange.delay'); $args->set('x-dead-letter-routing-key', 'routingkey.delay'); $channel->queue_declare('queue.order', false, true, false, false, false, $args); $channel->queue_declare('queue.delay', false, true, false, false); $channel->queue_bind('queue.order', 'exchange.order', 'routingkey.cancel.order'); $channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay'); $message = new AMQPMessage('F20190413180108970');
$channel->basic_publish($message, 'exchange.order', 'routingkey.cancel.order'); $channel->close(); $connection->close();
運行生產者代碼之后,queue.order 隊列會有一條消息,如下圖:
10秒之后,消息會過期,然后被進入 exchange.delay, 進而路由到 queue.delay 隊列中:
3.2、消費者代碼
<?php require __DIR__ . '/../../../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage; use PhpAmqpLib\Exchange\AMQPExchangeType; use PhpAmqpLib\Connection\AMQPStreamConnection; // todo 更改配置 $connection = new AMQPStreamConnection('192.168.33.1', 5672, 'zhangcs', 'zhangcs', '/'); $channel = $connection->channel(); $channel->exchange_declare('exchange.delay', AMQPExchangeType::DIRECT, false, true); $channel->queue_declare('queue.delay', false, true, false, false); $channel->queue_bind('queue.delay', 'exchange.delay', 'routingkey.delay'); function process_message($message) { echo "開始處理訂單,訂單號:" . $message->body . PHP_EOL; echo "獲取訂單的狀態,如果未支付,則進行取消訂單操作" . PHP_EOL; $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); } $channel->basic_consume('queue.delay', 'cancelOrder', false, false, false, false, 'process_message'); function shutdown($channel, $connection) { $channel->close(); $connection->close(); } register_shutdown_function('shutdown', $channel, $connection); while ($channel ->is_consuming()) { $channel->wait(); }
運行消費者代碼之后,會獲取到訂單號,之后可以檢查該訂單的狀態,如果未支付則進行取消操作,如下圖: