rabbitmq重試機制


生產者:

正常的生產消息,不過再消息頭部設置了一個參數 表示消息的嘗試次數

$msg = new AMQPMessage('Hello World');
$headers = new \PhpAmqpLib\Wire\AMQPTable([
    "retry_nums"=>0
]);
$msg->set('application_headers', $headers);
$channel->basic_publish($msg,$exchange,$queue);

 

 

<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;


$queue = "cache_queue";
$exchange = "cache_exchange";



//獲取連接
$connection = new AMQPStreamConnection('192.168.2.245', 5672, 'guest', 'guest');
//從連接中創建通道
$channel = $connection->channel();


$channel->exchange_declare($exchange, 'direct',false,true,false);



$channel->queue_declare($queue,false,true,false,false,false);
$channel->queue_bind($queue, $exchange,$queue);



$msg = new AMQPMessage('Hello World');
$headers = new \PhpAmqpLib\Wire\AMQPTable([
    "retry_nums"=>0
]);
$msg->set('application_headers', $headers);
$channel->basic_publish($msg,$exchange,$queue);
echo date('Y-m-d H:i:s')." [x] Sent 'Hello World!' ".PHP_EOL;




//while ($wait) {
//    $channel->wait();
//}

$channel->close();
$connection->close();

 

 

消費者:

消費者假如消費異常或者失敗,則把消息放入一個臨時隊列  暫且命名為

rety_cache_exchange
rety_cache_queue_10


臨時隊列rety_cache_queue_10 設置一個ttl過期時間 ,然后綁定一個死信隊列(死信隊列為原始生產消息隊列);這樣當消息過期了,消息自動進入死信隊列(原始隊列);就實現了 消息嘗試的機制
設置頭部參數
retry_nums++  的目的 就是為了得到消息已經消費了多少次  ;可以當消息達到消費5次后 就告警 入庫db操作
<?php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;


$queue = "cache_queue";
$exchange = "cache_exchange";

$rety_queue =  "rety_cache_queue_10";
$rety_exchange =  "rety_cache_exchange";


//獲取連接
$connection = new AMQPStreamConnection('192.168.2.245', 5672, 'guest', 'guest');
//從連接中創建通道
$channel = $connection->channel();

$channel->exchange_declare($exchange, 'direct',false,true,false);

$channel->queue_declare($queue,false,true,false,false,false);
$channel->queue_bind($queue, $exchange,$queue);


$channel2 = $connection->channel();

$channel2->exchange_declare($rety_exchange, 'direct',false,true,false);
$tale = new \PhpAmqpLib\Wire\AMQPTable();
$tale->set('x-dead-letter-exchange', $exchange);
$tale->set('x-dead-letter-routing-key',$queue);
$tale->set('x-message-ttl',50000);
$channel2->queue_declare($rety_queue,false,true,false,false,false,$tale);
$channel2->queue_bind($rety_queue, $rety_exchange,$rety_queue);








$channel->basic_qos(null, 1, null);
$channel->basic_consume($queue, '', false, false, false, false, function(AMQPMessage $message) use ($rety_exchange,$rety_queue,$channel2){
    echo $message->body,PHP_EOL;
    $msg_headers = $message->get('application_headers')->getNativeData();
    var_dump($msg_headers);
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
    $msg = new AMQPMessage($message->body);
    $headers = new \PhpAmqpLib\Wire\AMQPTable([
        "retry_nums"=>intval($msg_headers['retry_nums'])+1
    ]);
    $msg->set('application_headers', $headers);
    $channel2->basic_publish($msg,$rety_exchange,$rety_queue);


});

function shutdown($channel, $connection)
{
    $channel->close();
    $connection->close();
}
register_shutdown_function('shutdown', $channel, $connection);

while (count($channel->callbacks)) {
    $channel->wait();
}

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM