死信队列:
DLX(Dead-Letter-Exchange),可以称为死信交换器。当消息在一个队列中变成死信(dead message)之后,它能被重新发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称为死信队列。
消息变成死信队列有下面几个情况:
- 消息被拒绝(channel.basicNack或channel.basicReject),并且设置requeue参数为false;
- 消息过期;
- 队列达到最大长度。
死信队列案例:
private void btnBasicPublish_Click(object sender, EventArgs e)
{
string exchangeName = "myexchange1";
string queueName = "myqueue3";
string bindingKey = "myroutingkey3";
string routingKey = "myroutingkey3";
//创建死信队列
this.CreateDeadQueue();
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
channel.QueueDeclare(queueName, true, false, false,
new Dictionary<string, object> {
{ "x-message-ttl", 5000 },//过期时间5s
{ "x-dead-letter-exchange", DEAD_EXCHANGE_NAME },//死信队列交换机
{ "x-dead-letter-routing-key", DEAD_QUEUE_NAME }//死信队列routingKey
});
channel.QueueBind(queueName, exchangeName, bindingKey, null);
//消息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;//相当于设置DeliveryMode=2
//5:发布消息
for (int i = 0; i < 5; i++)
{
var msg = Encoding.UTF8.GetBytes($"{i}:haha");
channel.BasicPublish(exchangeName, routingKey, properties, msg);
}
}
}
/// <summary>
/// 创建死信交换机、死信队列、并绑定
/// 死信交换机、队列就是一个普通交换机、队列
/// </summary>
private void CreateDeadQueue()
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(DEAD_EXCHANGE_NAME, ExchangeType.Direct, true, false, null);
channel.QueueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
channel.QueueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME, null);
}
}
延时队列
延时队列存储的对象是对应的延迟消息,延迟消息指当消息被发送到队列后,并不立即被消费者拿到消息,而是等待特定的时间后,消费者才能拿到消息。
延时队列需要安装一个插件,下载地址:https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
延时队列应用场景有:
- 在订单系统中,用户下单后,通常有30分钟的付款时间。如果30分钟后没有付款,则订单被取消,这里可以使用延时队列处理订单。
- 用户通过远程控制家里的智能设备在指定时间进行工作,可以将用户指定发送到延时队列,到达时间后再推送到智能设备。
- 在RabbitMQ中,延时队列通过前面的DLX和TTL共同作用可以模拟出延迟队列功能。
通过声明一个x-delayed-message类型的exchange来使用delayed-messaging特性
x-delayed-message是插件提供的类型,并不是rabbitmq本身的
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
发送消息的时候通过在header添加”x-delay”参数来控制消息的延时时间
byte[] messages= "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", properties.build(), messages);
和死信队列消费者监听正常队列不同,延时队列中消费者监听的是死信队列。当消费在设置DLX和TTL后,发送到队列中经过指定时候后变成死信,死信重新发送到死信队列,而消费者监听到死信队列中有消息而进行消费,这样就达到了消息的延时。
参考:
https://blog.csdn.net/yaomingyang/article/details/102753004