PHP RabbitMQ實現簡單的延遲隊列


1.TTL+死信隊列(DLX)實現

TTL(x-message-ttl)是指隊列中的消息在丟棄之前的可存活時間。死信隊列是放置沒有被成功消費且超過了TTL生存時間消息的隊列,如果消息沒有在指定的TTL時間內被成功消費,並且給需要延遲執行的隊列綁定了死信交換機和死信隊列,將信息publish到死信交換機中后可被綁定交換機的死信隊列消費,利用這一特性可以實現延遲隊列。

 

消息隊列中的消息會在一下幾種情況下變成死信

消息被拒絕(basic.reject / basic.nack),並且requeue = false;
消息TTL過期;
隊列達到最大長度;

 

在聲明被延遲的任務隊列前,需要配置如下參數。x-message-ttl設置隊列中消息的生存期,超過這個時間消息將變成死信,也可以在單條消息publish的時候設置ttl,rabbitmq會取兩者中較小者。

$arguments = [
    'x-message-ttl' => 6000, //消息在丟棄之前的可存活時間
    'x-dead-letter-exchange' => $deadExchangeName, //死信發送的交換機名字
    'x-dead-letter-routing-key' => $deadRouteKey, //死信的路由鍵
];
$queue->setArguments($arguments);
 

 

消費者代碼

//創建連接和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!\n");
}

$channel  = new AMQPChannel($connect);

//**********************創建一個用於存放死信的交換機和隊列*************
$deadExchangeName = 'dead_exchange';
$deadQueueName    = 'delayed_order';
$deadRouteKey     = 'delayed_order';

$deadExchange = new AMQPExchange($channel);
$deadExchange->setName($deadExchangeName);
$deadExchange->setType(AMQP_EX_TYPE_DIRECT);
$deadExchange->declareExchange();

$deadQueue = new AMQPQueue($channel);
$deadQueue->setName($deadQueueName);
$deadQueue->declareQueue();
$deadQueue->bind($deadExchange->getName(), $deadRouteKey);


//***********************創建被延遲的交換機和消息隊列********************
$exchangeName = 'exchange1';
$queueName    = 'order';
$routeKey     = 'order';
$exchange = new AMQPExchange($channel);
$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);

// 1:不持久化到磁盤,宕機數據消失 2:持久化到磁盤
// $exchange->setFlags(AMQP_DURABLE);

// 聲明交換機
$exchange->declareExchange();

// 創建消息隊列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
$arguments = [
    'x-message-ttl' => 6000,
    'x-dead-letter-exchange' => $deadExchangeName, //死信發送的交換機
    'x-dead-letter-routing-key' => $deadRouteKey, //死信routeKey
];

// 設置持久性
// $queue->setFlags(AMQP_DURABLE);

$queue->setArguments($arguments);
// 聲明消息隊列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 向服務器隊列推送10條消息
$msg = 'hello world 1';
$exchange->publish($msg, $routeKey, AMQP_NOPARAM, ['delivery_mode' => 2]);

 

生產者代碼

$exchangeName = 'dead_exchange';
$queueName    = 'delayed_order';
$routeKey     = 'delayed_order';
 
//創建連接和channel
$connect = new AMQPConnection($config);

if (!$connect->connect()) {
    die("Cannot connect to the broker!\n");
}

$channel  = new AMQPChannel($connect);
$exchange = new AMQPExchange($channel);

$exchange->setName($exchangeName);
$exchange->setType(AMQP_EX_TYPE_DIRECT);

// 1:不持久化到磁盤,宕機數據消失 2:持久化到磁盤
// $exchange->setFlags(AMQP_DURABLE);

// 聲明交換機
$exchange->declareExchange();

// 創建消息隊列
$queue = new AMQPQueue($channel);
$queue->setName($queueName);
// $queue->setArgument('x-message-ttl', 5000);
// 設置持久性
// $queue->setFlags(AMQP_DURABLE);

// 聲明消息隊列
$queue->declareQueue();

$queue->bind($exchange->getName(), $routeKey);

// 接收消息並處理回調
$queue->consume('receive');

// 處理回調的方法
function receive($envelop, $queue){
    echo $envelop->getBody() . "\n";

    // ACK 通知生產者任務完成
    $queue->ack($envelop->getDeliveryTag(), AMQP_NOPARAM);
}

  

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM