RabbitMQ笔记-死信队列与延时队列


死信队列:

DLX(Dead-Letter-Exchange),可以称为死信交换器。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称为死信队列。

消息变成死信队列有下面几个情况:

  • 消息被拒绝(channel.basicNack或channel.basicReject),并且设置requeue参数为false;
  • 消息过期;
  • 队列达到最大长度。

死信队列案例:

 private void btnBasicPublish_Click(object sender, EventArgs e)
        {
            string exchangeName = "myexchange1";
            string queueName = "myqueue3";
            string bindingKey = "myroutingkey3";
            string routingKey = "myroutingkey3";
            //创建死信队列
            this.CreateDeadQueue();
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
                channel.QueueDeclare(queueName, true, false, false,
                    new Dictionary<string, object> {
                        { "x-message-ttl", 5000 },//过期时间5s
                    { "x-dead-letter-exchange", DEAD_EXCHANGE_NAME },//死信队列交换机
                    { "x-dead-letter-routing-key", DEAD_QUEUE_NAME }//死信队列routingKey
                    });
                channel.QueueBind(queueName, exchangeName, bindingKey, null);

                //消息持久化
                var properties = channel.CreateBasicProperties();
                properties.Persistent = true;//相当于设置DeliveryMode=2

                //5:发布消息
                for (int i = 0; i < 5; i++)
                {
                    var msg = Encoding.UTF8.GetBytes($"{i}:haha");
                    channel.BasicPublish(exchangeName, routingKey, properties, msg);
                }
            }
        }


/// <summary>
        /// 创建死信交换机、死信队列、并绑定
        /// 死信交换机、队列就是一个普通交换机、队列
        /// </summary>
        private void CreateDeadQueue()
        {
            using (var channel = connection.CreateModel())
            {
                channel.ExchangeDeclare(DEAD_EXCHANGE_NAME, ExchangeType.Direct, true, false, null);
                channel.QueueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
                channel.QueueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME, null);
            }
        }

延时队列

延时队列存储的对象是对应的延迟消息,延迟消息指当消息被发送到队列后,并不立即被消费者拿到消息,而是等待特定的时间后,消费者才能拿到消息。
延时队列需要安装一个插件,下载地址:https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange

延时队列应用场景有:

  • 在订单系统中,用户下单后,通常有30分钟的付款时间。如果30分钟后没有付款,则订单被取消,这里可以使用延时队列处理订单。
  • 用户通过远程控制家里的智能设备在指定时间进行工作,可以将用户指定发送到延时队列,到达时间后再推送到智能设备。
  • 在RabbitMQ中,延时队列通过前面的DLX和TTL共同作用可以模拟出延迟队列功能。

通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性
x-delayed-message是插件提供的类型,并不是rabbitmq本身的

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);

发送消息的时候通过在header添加”x-delay”参数来控制消息的延时时间

byte[] messages= "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", properties.build(), messages);

和死信队列消费者监听正常队列不同,延时队列中消费者监听的是死信队列。当消费在设置DLX和TTL后,发送到队列中经过指定时候后变成死信,死信重新发送到死信队列,而消费者监听到死信队列中有消息而进行消费,这样就达到了消息的延时。

参考:
https://blog.csdn.net/yaomingyang/article/details/102753004


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM