死信隊列:
DLX(Dead-Letter-Exchange),可以稱為死信交換器。當消息在一個隊列中變成死信(dead message)之后,它能被重新發送到另一個交換器中,這個交換器就是DLX,綁定DLX的隊列就稱為死信隊列。
消息變成死信隊列有下面幾個情況:
- 消息被拒絕(channel.basicNack或channel.basicReject),並且設置requeue參數為false;
- 消息過期;
- 隊列達到最大長度。
死信隊列案例:
private void btnBasicPublish_Click(object sender, EventArgs e)
{
string exchangeName = "myexchange1";
string queueName = "myqueue3";
string bindingKey = "myroutingkey3";
string routingKey = "myroutingkey3";
//創建死信隊列
this.CreateDeadQueue();
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
channel.QueueDeclare(queueName, true, false, false,
new Dictionary<string, object> {
{ "x-message-ttl", 5000 },//過期時間5s
{ "x-dead-letter-exchange", DEAD_EXCHANGE_NAME },//死信隊列交換機
{ "x-dead-letter-routing-key", DEAD_QUEUE_NAME }//死信隊列routingKey
});
channel.QueueBind(queueName, exchangeName, bindingKey, null);
//消息持久化
var properties = channel.CreateBasicProperties();
properties.Persistent = true;//相當於設置DeliveryMode=2
//5:發布消息
for (int i = 0; i < 5; i++)
{
var msg = Encoding.UTF8.GetBytes($"{i}:haha");
channel.BasicPublish(exchangeName, routingKey, properties, msg);
}
}
}
/// <summary>
/// 創建死信交換機、死信隊列、並綁定
/// 死信交換機、隊列就是一個普通交換機、隊列
/// </summary>
private void CreateDeadQueue()
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(DEAD_EXCHANGE_NAME, ExchangeType.Direct, true, false, null);
channel.QueueDeclare(DEAD_QUEUE_NAME, true, false, false, null);
channel.QueueBind(DEAD_QUEUE_NAME, DEAD_EXCHANGE_NAME, DEAD_QUEUE_NAME, null);
}
}
延時隊列
延時隊列存儲的對象是對應的延遲消息,延遲消息指當消息被發送到隊列后,並不立即被消費者拿到消息,而是等待特定的時間后,消費者才能拿到消息。
延時隊列需要安裝一個插件,下載地址:https://bintray.com/rabbitmq/community-plugins/rabbitmq_delayed_message_exchange
延時隊列應用場景有:
- 在訂單系統中,用戶下單后,通常有30分鍾的付款時間。如果30分鍾后沒有付款,則訂單被取消,這里可以使用延時隊列處理訂單。
- 用戶通過遠程控制家里的智能設備在指定時間進行工作,可以將用戶指定發送到延時隊列,到達時間后再推送到智能設備。
- 在RabbitMQ中,延時隊列通過前面的DLX和TTL共同作用可以模擬出延遲隊列功能。
通過聲明一個x-delayed-message類型的exchange來使用delayed-messaging特性
x-delayed-message是插件提供的類型,並不是rabbitmq本身的
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
發送消息的時候通過在header添加”x-delay”參數來控制消息的延時時間
byte[] messages= "delayed payload".getBytes("UTF-8");
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties.Builder().headers(headers);
channel.basicPublish("my-exchange", "", properties.build(), messages);
和死信隊列消費者監聽正常隊列不同,延時隊列中消費者監聽的是死信隊列。當消費在設置DLX和TTL后,發送到隊列中經過指定時候后變成死信,死信重新發送到死信隊列,而消費者監聽到死信隊列中有消息而進行消費,這樣就達到了消息的延時。
參考:
https://blog.csdn.net/yaomingyang/article/details/102753004