PHP實戰RabbitMQ之生產者Confirm篇


簡介

為提高系統高可用,生產者在發送message需要通過MQ回復ACK來確保message被成功存儲。

RabbitMQ官網對Publisher Confirms有相關介紹,可惜example代碼沒有php版本的,且php-amqplib也是一幫志願者在維護,精力有限,沒有完整的使用example

php-amqblib也有開發者提了Issues希望完善example
https://github.com/php-amqpli...

本文將會使用php來實現Publisher Confirms,希望對大家有所幫助,代碼已提交到github:https://github.com/jiaoyang3/...

幾種策落

事務

事務可以保證原子性操作RabbitMQ,使用起來也很簡單。

$channel->tx_select();//begin trx
$channel->tx_commit();//commit trx
$channel->tx_rollback();//rollback

測試example,運行之后發現沒有message提交到queue

<?php

require_once '../../vendor/autoload.php';

use PhpAmqpLib\Message\AMQPMessage;
use RabbitMQ\RabbitMQ;

try {
    $rabbit  = new RabbitMQ();
    $channel = $rabbit->getChannel();
    $channel->tx_select();//begin trx

    $queueName = 'test-single-queue2';
    $rabbit->createQueue($queueName, false, true, false, false);

    for ($i = 0; $i < 10000; $i++) {
        $rabbit->sendMessage($i . "this is a test message.", $queueName, '', [
            'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啟rabbitmq,消息不會丟失
        ]);
        if ($i == 10) {
            throw new Exception('rollbock');
        }
    }
    $channel->tx_commit();//commit trx
    unset($rabbit);//close
} catch (Exception $e) {
    $channel->tx_rollback();//rollback
    echo $e->getMessage();
}

單條ack

生產者每發送一條messageMQ服務會對出數據進行持久化存儲成功之后再回復ack/nack消息給生產者。

開啟confirm

$channel->confirm_select();

注冊ack/nack回掉方法

//ack callback function
$channel->set_ack_handler(function (AMQPMessage $message){
    echo 'ack' . $message->getBody() . PHP_EOL;
});
//nack callback function
$channel->set_nack_handler(function (AMQPMessage $message){
    echo 'ack' . $message->getBody() .PHP_EOL;
});

設置ack/nack超時時間

$channel->wait_for_pending_acks_returns(5);//set wait time

測試代碼如下,完整代碼可訪問github

<?php

require_once '../../vendor/autoload.php';

use PhpAmqpLib\Message\AMQPMessage;
use RabbitMQ\RabbitMQ;


$rabbit  = new RabbitMQ();
$channel = $rabbit->getChannel();
$channel->confirm_select();//open confirm
//ack callback function
$channel->set_ack_handler(function (AMQPMessage $message){
    echo 'ack' . $message->getBody() . PHP_EOL;
});
//nack callback function
$channel->set_nack_handler(function (AMQPMessage $message){
    echo 'ack' . $message->getBody() .PHP_EOL;
});

$queueName = 'test-single-queue1';
$rabbit->createQueue($queueName, false, true, false, false);

for ($i = 0; $i < 10000; $i++) {
    $message = $i . "this is a test message.";
    $rabbit->sendMessage($message, $queueName, '', [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
    ]);
    echo $message . '  has been sent' . PHP_EOL;
    $channel->wait_for_pending_acks_returns(5);//set wait time
    sleep(1);
}

unset($rabbit);//關閉連接

多條ack

每條messageack性能很低
這里進行批量ack,設置每50個message集中ack,代碼也很簡單

<?php

require_once '../../vendor/autoload.php';

use PhpAmqpLib\Message\AMQPMessage;
use RabbitMQ\RabbitMQ;


$rabbit  = new RabbitMQ();
$channel = $rabbit->getChannel();
$channel->confirm_select();//open confirm
//ack callback function
$channel->set_ack_handler(function (AMQPMessage $message){
    echo 'ack' . $message->getBody() . PHP_EOL;
});
//nack callback function
$channel->set_nack_handler(function (AMQPMessage $message){
    echo 'ack' . $message->getBody() .PHP_EOL;
});


$queueName = 'test-single-queue';
$rabbit->createQueue($queueName, false, true, false, false);

//每次都ack性能很低,批量ack,設置每50個message集中ack
$batchSize               = 50;
$outstandingMessageCount = 0;
for ($i = 0; $i < 10000; $i++) {
    $message = $i . "this is a test message.";
    $rabbit->sendMessage($message, $queueName, '', [
        'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啟rabbitmq,消息不會丟失
    ]);
    echo $message . '  已發送' . PHP_EOL;
    if (++$outstandingMessageCount == $batchSize) {
        echo '------';
        $channel->wait_for_pending_acks_returns(5);
        $outstandingMessageCount = 0;
    }
    sleep(1);
}
if ($outstandingMessageCount > 0) {
    $channel->wait_for_pending_acks_returns(5);
}

unset($rabbit);//關閉連接

異步ack

毫無疑問,異步的效率是最高的,可惜沒有發現php-amqplib並沒有實現,一種語言用的人越多生態越好,php已不復當年。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM