1.TTL+死信隊列(DLX)實現
TTL(x-message-ttl)是指隊列中的消息在丟棄之前的可存活時間。死信隊列是放置沒有被成功消費且超過了TTL生存時間消息的隊列,如果消息沒有在指定的TTL時間內被成功消費,並且給需要延遲執行的隊列綁定了死信交換機和死信隊列,將信息publish到死信交換機中后可被綁定交換機的死信隊列消費,利用這一特性可以實現延遲隊列。
消息隊列中的消息會在一下幾種情況下變成死信
消息被拒絕(basic.reject / basic.nack),並且requeue = false; 消息TTL過期; 隊列達到最大長度;
在聲明被延遲的任務隊列前,需要配置如下參數。x-message-ttl設置隊列中消息的生存期,超過這個時間消息將變成死信,也可以在單條消息publish的時候設置ttl,rabbitmq會取兩者中較小者。
$arguments = [ 'x-message-ttl' => 6000, //消息在丟棄之前的可存活時間 'x-dead-letter-exchange' => $deadExchangeName, //死信發送的交換機名字 'x-dead-letter-routing-key' => $deadRouteKey, //死信的路由鍵 ]; $queue->setArguments($arguments);
消費者代碼
//創建連接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($connect); //**********************創建一個用於存放死信的交換機和隊列************* $deadExchangeName = 'dead_exchange'; $deadQueueName = 'delayed_order'; $deadRouteKey = 'delayed_order'; $deadExchange = new AMQPExchange($channel); $deadExchange->setName($deadExchangeName); $deadExchange->setType(AMQP_EX_TYPE_DIRECT); $deadExchange->declareExchange(); $deadQueue = new AMQPQueue($channel); $deadQueue->setName($deadQueueName); $deadQueue->declareQueue(); $deadQueue->bind($deadExchange->getName(), $deadRouteKey); //***********************創建被延遲的交換機和消息隊列******************** $exchangeName = 'exchange1'; $queueName = 'order'; $routeKey = 'order'; $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); // 1:不持久化到磁盤,宕機數據消失 2:持久化到磁盤 // $exchange->setFlags(AMQP_DURABLE); // 聲明交換機 $exchange->declareExchange(); // 創建消息隊列 $queue = new AMQPQueue($channel); $queue->setName($queueName); $arguments = [ 'x-message-ttl' => 6000, 'x-dead-letter-exchange' => $deadExchangeName, //死信發送的交換機 'x-dead-letter-routing-key' => $deadRouteKey, //死信routeKey ]; // 設置持久性 // $queue->setFlags(AMQP_DURABLE); $queue->setArguments($arguments); // 聲明消息隊列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 向服務器隊列推送10條消息 $msg = 'hello world 1'; $exchange->publish($msg, $routeKey, AMQP_NOPARAM, ['delivery_mode' => 2]);
生產者代碼
$exchangeName = 'dead_exchange'; $queueName = 'delayed_order'; $routeKey = 'delayed_order'; //創建連接和channel $connect = new AMQPConnection($config); if (!$connect->connect()) { die("Cannot connect to the broker!\n"); } $channel = new AMQPChannel($connect); $exchange = new AMQPExchange($channel); $exchange->setName($exchangeName); $exchange->setType(AMQP_EX_TYPE_DIRECT); // 1:不持久化到磁盤,宕機數據消失 2:持久化到磁盤 // $exchange->setFlags(AMQP_DURABLE); // 聲明交換機 $exchange->declareExchange(); // 創建消息隊列 $queue = new AMQPQueue($channel); $queue->setName($queueName); // $queue->setArgument('x-message-ttl', 5000); // 設置持久性 // $queue->setFlags(AMQP_DURABLE); // 聲明消息隊列 $queue->declareQueue(); $queue->bind($exchange->getName(), $routeKey); // 接收消息並處理回調 $queue->consume('receive'); // 處理回調的方法 function receive($envelop, $queue){ echo $envelop->getBody() . "\n"; // ACK 通知生產者任務完成 $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM); }