簡介
為提高系統高可用,生產者在發送message需要通過MQ回復ACK來確保message被成功存儲。

RabbitMQ官網對Publisher Confirms有相關介紹,可惜example代碼沒有php版本的,且php-amqplib也是一幫志願者在維護,精力有限,沒有完整的使用example
php-amqblib也有開發者提了Issues希望完善example
https://github.com/php-amqpli...
本文將會使用php來實現Publisher Confirms,希望對大家有所幫助,代碼已提交到github:https://github.com/jiaoyang3/...
幾種策落
事務
事務可以保證原子性操作RabbitMQ,使用起來也很簡單。
$channel->tx_select();//begin trx $channel->tx_commit();//commit trx $channel->tx_rollback();//rollback
測試example,運行之后發現沒有message提交到queue
<?php
require_once '../../vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use RabbitMQ\RabbitMQ;
try {
$rabbit = new RabbitMQ();
$channel = $rabbit->getChannel();
$channel->tx_select();//begin trx
$queueName = 'test-single-queue2';
$rabbit->createQueue($queueName, false, true, false, false);
for ($i = 0; $i < 10000; $i++) {
$rabbit->sendMessage($i . "this is a test message.", $queueName, '', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啟rabbitmq,消息不會丟失
]);
if ($i == 10) {
throw new Exception('rollbock');
}
}
$channel->tx_commit();//commit trx
unset($rabbit);//close
} catch (Exception $e) {
$channel->tx_rollback();//rollback
echo $e->getMessage();
}
單條ack
生產者每發送一條message,MQ服務會對出數據進行持久化存儲成功之后再回復ack/nack消息給生產者。
開啟confirm
$channel->confirm_select();
注冊ack/nack回掉方法
//ack callback function
$channel->set_ack_handler(function (AMQPMessage $message){
echo 'ack' . $message->getBody() . PHP_EOL;
});
//nack callback function
$channel->set_nack_handler(function (AMQPMessage $message){
echo 'ack' . $message->getBody() .PHP_EOL;
});
設置ack/nack超時時間
$channel->wait_for_pending_acks_returns(5);//set wait time
測試代碼如下,完整代碼可訪問github
<?php
require_once '../../vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use RabbitMQ\RabbitMQ;
$rabbit = new RabbitMQ();
$channel = $rabbit->getChannel();
$channel->confirm_select();//open confirm
//ack callback function
$channel->set_ack_handler(function (AMQPMessage $message){
echo 'ack' . $message->getBody() . PHP_EOL;
});
//nack callback function
$channel->set_nack_handler(function (AMQPMessage $message){
echo 'ack' . $message->getBody() .PHP_EOL;
});
$queueName = 'test-single-queue1';
$rabbit->createQueue($queueName, false, true, false, false);
for ($i = 0; $i < 10000; $i++) {
$message = $i . "this is a test message.";
$rabbit->sendMessage($message, $queueName, '', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT
]);
echo $message . ' has been sent' . PHP_EOL;
$channel->wait_for_pending_acks_returns(5);//set wait time
sleep(1);
}
unset($rabbit);//關閉連接
多條ack
每條message都ack性能很低
這里進行批量ack,設置每50個message集中ack,代碼也很簡單
<?php
require_once '../../vendor/autoload.php';
use PhpAmqpLib\Message\AMQPMessage;
use RabbitMQ\RabbitMQ;
$rabbit = new RabbitMQ();
$channel = $rabbit->getChannel();
$channel->confirm_select();//open confirm
//ack callback function
$channel->set_ack_handler(function (AMQPMessage $message){
echo 'ack' . $message->getBody() . PHP_EOL;
});
//nack callback function
$channel->set_nack_handler(function (AMQPMessage $message){
echo 'ack' . $message->getBody() .PHP_EOL;
});
$queueName = 'test-single-queue';
$rabbit->createQueue($queueName, false, true, false, false);
//每次都ack性能很低,批量ack,設置每50個message集中ack
$batchSize = 50;
$outstandingMessageCount = 0;
for ($i = 0; $i < 10000; $i++) {
$message = $i . "this is a test message.";
$rabbit->sendMessage($message, $queueName, '', [
'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啟rabbitmq,消息不會丟失
]);
echo $message . ' 已發送' . PHP_EOL;
if (++$outstandingMessageCount == $batchSize) {
echo '------';
$channel->wait_for_pending_acks_returns(5);
$outstandingMessageCount = 0;
}
sleep(1);
}
if ($outstandingMessageCount > 0) {
$channel->wait_for_pending_acks_returns(5);
}
unset($rabbit);//關閉連接
異步ack
毫無疑問,異步的效率是最高的,可惜沒有發現php-amqplib並沒有實現,一種語言用的人越多生態越好,php已不復當年。
