簡介
為提高系統高可用,生產者在發送message
需要通過MQ
回復ACK
來確保message
被成功存儲。
RabbitMQ
官網對Publisher Confirms有相關介紹,可惜example
代碼沒有php
版本的,且php-amqplib
也是一幫志願者在維護,精力有限,沒有完整的使用example
php-amqblib
也有開發者提了Issues
希望完善example
https://github.com/php-amqpli...
本文將會使用php來實現Publisher Confirms
,希望對大家有所幫助,代碼已提交到github
:https://github.com/jiaoyang3/...
幾種策落
事務
事務可以保證原子性操作RabbitMQ,使用起來也很簡單。
$channel->tx_select();//begin trx $channel->tx_commit();//commit trx $channel->tx_rollback();//rollback
測試example
,運行之后發現沒有message
提交到queue
<?php require_once '../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage; use RabbitMQ\RabbitMQ; try { $rabbit = new RabbitMQ(); $channel = $rabbit->getChannel(); $channel->tx_select();//begin trx $queueName = 'test-single-queue2'; $rabbit->createQueue($queueName, false, true, false, false); for ($i = 0; $i < 10000; $i++) { $rabbit->sendMessage($i . "this is a test message.", $queueName, '', [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啟rabbitmq,消息不會丟失 ]); if ($i == 10) { throw new Exception('rollbock'); } } $channel->tx_commit();//commit trx unset($rabbit);//close } catch (Exception $e) { $channel->tx_rollback();//rollback echo $e->getMessage(); }
單條ack
生產者每發送一條message
,MQ
服務會對出數據進行持久化存儲成功之后再回復ack/nack
消息給生產者。
開啟confirm
$channel->confirm_select();
注冊ack/nack回掉方法
//ack callback function $channel->set_ack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() . PHP_EOL; }); //nack callback function $channel->set_nack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() .PHP_EOL; });
設置ack/nack
超時時間
$channel->wait_for_pending_acks_returns(5);//set wait time
測試代碼如下,完整代碼可訪問github
<?php require_once '../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $channel = $rabbit->getChannel(); $channel->confirm_select();//open confirm //ack callback function $channel->set_ack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() . PHP_EOL; }); //nack callback function $channel->set_nack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() .PHP_EOL; }); $queueName = 'test-single-queue1'; $rabbit->createQueue($queueName, false, true, false, false); for ($i = 0; $i < 10000; $i++) { $message = $i . "this is a test message."; $rabbit->sendMessage($message, $queueName, '', [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); echo $message . ' has been sent' . PHP_EOL; $channel->wait_for_pending_acks_returns(5);//set wait time sleep(1); } unset($rabbit);//關閉連接
多條ack
每條message
都ack
性能很低
這里進行批量ack
,設置每50個message
集中ack
,代碼也很簡單
<?php require_once '../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $channel = $rabbit->getChannel(); $channel->confirm_select();//open confirm //ack callback function $channel->set_ack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() . PHP_EOL; }); //nack callback function $channel->set_nack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() .PHP_EOL; }); $queueName = 'test-single-queue'; $rabbit->createQueue($queueName, false, true, false, false); //每次都ack性能很低,批量ack,設置每50個message集中ack $batchSize = 50; $outstandingMessageCount = 0; for ($i = 0; $i < 10000; $i++) { $message = $i . "this is a test message."; $rabbit->sendMessage($message, $queueName, '', [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重啟rabbitmq,消息不會丟失 ]); echo $message . ' 已發送' . PHP_EOL; if (++$outstandingMessageCount == $batchSize) { echo '------'; $channel->wait_for_pending_acks_returns(5); $outstandingMessageCount = 0; } sleep(1); } if ($outstandingMessageCount > 0) { $channel->wait_for_pending_acks_returns(5); } unset($rabbit);//關閉連接
異步ack
毫無疑問,異步的效率是最高的,可惜沒有發現php-amqplib
並沒有實現,一種語言用的人越多生態越好,php
已不復當年。