rabbitmq延迟队列的实现(利用死信队列)


普通的延迟队列不细说了,无论是设置统一的队列TTL,还是设置消息的TTL,全都是利用DeadLetterQueue:消息失效后扔到死信队列,消费者从死信队列里读消息。但在消息失效的过程中存在一个问题,比如如下场景:

延迟队列中依次收到如下消息 Message A: TTL 2000 Message B: TTL 100 Message C: TTL 5000 当延迟队列中消息超时后,移至死信队列

实际执行情况是rabbitMQ从队列头取到消息A,等待2秒,超时,发至死信队列 -> 判断消息B,发现已经超时,立即发至死信队列。原因是TTL被存在消息的内部,rabbitMQ一直去扫描每条消息的TTL,而是只判断队列头消息是否失效,于是消息B实际失效时间是2000ms。

目前rabbitMQ是不支持任意超时时间的(据说rocketMQ提供有限支持,没用过),但可以通过安装一个插件来解决:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange

安装过程很简单,下载下来,在rabbitMQ的sbin目录下执行安装

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

插件使用Exchange交换机来实现的TTL,而不是rabbitMQ那样通过死信队列:首先设置一个可延迟的Exchange,该Exchange会将收到的消息存放,按照延迟时间排序,直到达到延迟,才被转发到实际的执行队列。

启用插件后,进入rabbitMQ的管理页面进行配置:

新建一个Exchange,Type必须选择x-delayed-message,添加参数x-delayed-type=direct,然后绑定到执行队列

$msg = new AMQPMessage(json_encode([ 'event' => $eventName, 'params' => $params ]), [ 'delivery_mode' => 2, ]); $headers = new AMQPTable(['x-delay' => $delay]); $msg->set('application_headers', $headers); $this->channel->basic_publish($msg, self::EXCHANGE_DELAY_ANY_NAME, self::EXCHANGE_KEY_NOW);

接下来,为投递到该Exchange的消息添加插件所需的header:x-delay,值就是要延迟的时间,单位毫秒,如果不添加这个头,所有流入该交换机的消息都会立即被转发到执行队列。注意如果使用php-amqplib的话,需要用AMQPTable设置这个header

接下来按照刚才的三条消息测试一下,发现B消息成功比A消息进入队列了。搞定

 

转载:http://www.xuyanzhe.cn/?p=115

具体实现参考链接: https://juejin.im/post/5a12ffd451882578da0d7b3a

高级特性:https://zhuanlan.zhihu.com/p/60141062


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM