1.TTL+死信隊列(DLX)實現
TTL(x-message-ttl)是指隊列中的消息在丟棄之前的可存活時間。死信隊列是放置沒有被成功消費且超過了TTL生存時間消息的隊列,如果消息沒有在指定的TTL時間內被成功消費,並且給需要延遲執行的隊列綁定了死信交換機和死信隊列,將信息publish到死信交換機中后可被綁定交換機的死信隊列消費,利用這一特性可以實現延遲隊列。
消息隊列中的消息會在一下幾種情況下變成死信
消息被拒絕(basic.reject / basic.nack),並且requeue = false; 消息TTL過期; 隊列達到最大長度;
在聲明被延遲的任務隊列前,需要配置如下參數。x-message-ttl設置隊列中消息的生存期,超過這個時間消息將變成死信,也可以在單條消息publish的時候設置ttl,rabbitmq會取兩者中較小者。
$arguments = [
'x-message-ttl' => 6000, //消息在丟棄之前的可存活時間
'x-dead-letter-exchange' => $deadExchangeName, //死信發送的交換機名字
'x-dead-letter-routing-key' => $deadRouteKey, //死信的路由鍵
];
$queue->setArguments($arguments);
消費者代碼
//創建連接和channel
$connect = new AMQPConnection($config);
if (!$connect->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($connect);
//**********************創建一個用於存放死信的交換機和隊列*************
$deadExchangeName = 'dead_exchange';
$deadQueueName = 'delayed_order';
$deadRouteKey = 'delayed_order';
$deadExchange = new AMQPExchange($channel);
$deadExchange->setName($deadExchangeName);
$deadExchange->setType(AMQP_EX_TYPE_DIRECT);
$deadExchange->declareExchange();
$deadQueue = new AMQPQueue($channel);
$deadQueue->setName($deadQueueName);
$deadQueue->declareQueue();
$deadQueue->bind($deadExchange->getName(), $deadRouteKey);
//***********************創建被延遲的交換機和消息隊列********************
$exchangeName = 'exchange1';
$queueName = 'order';
$routeKey = 'order';
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
// 1:不持久化到磁盤,宕機數據消失 2:持久化到磁盤
// $exchange->setFlags(AMQP_DURABLE);
// 聲明交換機
$exchange->declareExchange();
// 創建消息隊列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$arguments = [
'x-message-ttl' => 6000,
'x-dead-letter-exchange' => $deadExchangeName, //死信發送的交換機
'x-dead-letter-routing-key' => $deadRouteKey, //死信routeKey
];
// 設置持久性
// $queue->setFlags(AMQP_DURABLE);
$queue->setArguments($arguments);
// 聲明消息隊列
$queue->declareQueue();
$queue->bind($exchange->getName(), $routeKey);
// 向服務器隊列推送10條消息
$msg = 'hello world 1';
$exchange->publish($msg, $routeKey, AMQP_NOPARAM, ['delivery_mode' => 2]);
生產者代碼
$exchangeName = 'dead_exchange';
$queueName = 'delayed_order';
$routeKey = 'delayed_order';
//創建連接和channel
$connect = new AMQPConnection($config);
if (!$connect->connect()) {
die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);
// 1:不持久化到磁盤,宕機數據消失 2:持久化到磁盤
// $exchange->setFlags(AMQP_DURABLE);
// 聲明交換機
$exchange->declareExchange();
// 創建消息隊列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
// $queue->setArgument('x-message-ttl', 5000);
// 設置持久性
// $queue->setFlags(AMQP_DURABLE);
// 聲明消息隊列
$queue->declareQueue();
$queue->bind($exchange->getName(), $routeKey);
// 接收消息並處理回調
$queue->consume('receive');
// 處理回調的方法
function receive($envelop, $queue){
echo $envelop->getBody() . "\n";
// ACK 通知生產者任務完成
$queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}
