简介
为提高系统高可用,生产者在发送message
需要通过MQ
回复ACK
来确保message
被成功存储。
RabbitMQ
官网对Publisher Confirms有相关介绍,可惜example
代码没有php
版本的,且php-amqplib
也是一帮志愿者在维护,精力有限,没有完整的使用example
php-amqblib
也有开发者提了Issues
希望完善example
https://github.com/php-amqpli...
本文将会使用php来实现Publisher Confirms
,希望对大家有所帮助,代码已提交到github
:https://github.com/jiaoyang3/...
几种策落
事务
事务可以保证原子性操作RabbitMQ,使用起来也很简单。
$channel->tx_select();//begin trx $channel->tx_commit();//commit trx $channel->tx_rollback();//rollback
测试example
,运行之后发现没有message
提交到queue
<?php require_once '../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage; use RabbitMQ\RabbitMQ; try { $rabbit = new RabbitMQ(); $channel = $rabbit->getChannel(); $channel->tx_select();//begin trx $queueName = 'test-single-queue2'; $rabbit->createQueue($queueName, false, true, false, false); for ($i = 0; $i < 10000; $i++) { $rabbit->sendMessage($i . "this is a test message.", $queueName, '', [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重启rabbitmq,消息不会丢失 ]); if ($i == 10) { throw new Exception('rollbock'); } } $channel->tx_commit();//commit trx unset($rabbit);//close } catch (Exception $e) { $channel->tx_rollback();//rollback echo $e->getMessage(); }
单条ack
生产者每发送一条message
,MQ
服务会对出数据进行持久化存储成功之后再回复ack/nack
消息给生产者。
开启confirm
$channel->confirm_select();
注册ack/nack回掉方法
//ack callback function $channel->set_ack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() . PHP_EOL; }); //nack callback function $channel->set_nack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() .PHP_EOL; });
设置ack/nack
超时时间
$channel->wait_for_pending_acks_returns(5);//set wait time
测试代码如下,完整代码可访问github
<?php require_once '../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $channel = $rabbit->getChannel(); $channel->confirm_select();//open confirm //ack callback function $channel->set_ack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() . PHP_EOL; }); //nack callback function $channel->set_nack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() .PHP_EOL; }); $queueName = 'test-single-queue1'; $rabbit->createQueue($queueName, false, true, false, false); for ($i = 0; $i < 10000; $i++) { $message = $i . "this is a test message."; $rabbit->sendMessage($message, $queueName, '', [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT ]); echo $message . ' has been sent' . PHP_EOL; $channel->wait_for_pending_acks_returns(5);//set wait time sleep(1); } unset($rabbit);//关闭连接
多条ack
每条message
都ack
性能很低
这里进行批量ack
,设置每50个message
集中ack
,代码也很简单
<?php require_once '../../vendor/autoload.php'; use PhpAmqpLib\Message\AMQPMessage; use RabbitMQ\RabbitMQ; $rabbit = new RabbitMQ(); $channel = $rabbit->getChannel(); $channel->confirm_select();//open confirm //ack callback function $channel->set_ack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() . PHP_EOL; }); //nack callback function $channel->set_nack_handler(function (AMQPMessage $message){ echo 'ack' . $message->getBody() .PHP_EOL; }); $queueName = 'test-single-queue'; $rabbit->createQueue($queueName, false, true, false, false); //每次都ack性能很低,批量ack,设置每50个message集中ack $batchSize = 50; $outstandingMessageCount = 0; for ($i = 0; $i < 10000; $i++) { $message = $i . "this is a test message."; $rabbit->sendMessage($message, $queueName, '', [ 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT //消息持久化,重启rabbitmq,消息不会丢失 ]); echo $message . ' 已发送' . PHP_EOL; if (++$outstandingMessageCount == $batchSize) { echo '------'; $channel->wait_for_pending_acks_returns(5); $outstandingMessageCount = 0; } sleep(1); } if ($outstandingMessageCount > 0) { $channel->wait_for_pending_acks_returns(5); } unset($rabbit);//关闭连接
异步ack
毫无疑问,异步的效率是最高的,可惜没有发现php-amqplib
并没有实现,一种语言用的人越多生态越好,php
已不复当年。