普通的延迟队列不细说了,无论是设置统一的队列TTL,还是设置消息的TTL,全都是利用DeadLetterQueue:消息失效后扔到死信队列,消费者从死信队列里读消息。但在消息失效的过程中存在一个问题,比如如下场景:
延迟队列中依次收到如下消息 Message A: TTL 2000 Message B: TTL 100 Message C: TTL 5000 当延迟队列中消息超时后,移至死信队列
实际执行情况是rabbitMQ从队列头取到消息A,等待2秒,超时,发至死信队列 -> 判断消息B,发现已经超时,立即发至死信队列。原因是TTL被存在消息的内部,rabbitMQ一直去扫描每条消息的TTL,而是只判断队列头消息是否失效,于是消息B实际失效时间是2000ms。
目前rabbitMQ是不支持任意超时时间的(据说rocketMQ提供有限支持,没用过),但可以通过安装一个插件来解决:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange
安装过程很简单,下载下来,在rabbitMQ的sbin目录下执行安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
插件使用Exchange交换机来实现的TTL,而不是rabbitMQ那样通过死信队列:首先设置一个可延迟的Exchange,该Exchange会将收到的消息存放,按照延迟时间排序,直到达到延迟,才被转发到实际的执行队列。
启用插件后,进入rabbitMQ的管理页面进行配置:
新建一个Exchange,Type必须选择x-delayed-message,添加参数x-delayed-type=direct,然后绑定到执行队列
$msg = new AMQPMessage(json_encode([ 'event' => $eventName, 'params' => $params ]), [ 'delivery_mode' => 2, ]); $headers = new AMQPTable(['x-delay' => $delay]); $msg->set('application_headers', $headers); $this->channel->basic_publish($msg, self::EXCHANGE_DELAY_ANY_NAME, self::EXCHANGE_KEY_NOW);
接下来,为投递到该Exchange的消息添加插件所需的header:x-delay,值就是要延迟的时间,单位毫秒,如果不添加这个头,所有流入该交换机的消息都会立即被转发到执行队列。注意如果使用php-amqplib的话,需要用AMQPTable设置这个header
接下来按照刚才的三条消息测试一下,发现B消息成功比A消息进入队列了。搞定
转载:http://www.xuyanzhe.cn/?p=115