代碼中,我們通常這樣聲明一個隊列:
//聲明隊列 channel.QueueDeclare ( queue: QueueName, //隊列名稱 durable: false, //隊列是否持久化.false:隊列在內存中,服務器掛掉后,隊列就沒了;true:服務器重啟后,隊列將會重新生成.注意:只是隊列持久化,不代表隊列中的消息持久化!!!! exclusive: false, //隊列是否專屬,專屬的范圍針對的是連接,也就是說,一個連接下面的多個信道是可見的.對於其他連接是不可見的.連接斷開后,該隊列會被刪除.注意,不是信道斷開,是連接斷開.並且,就算設置成了持久化,也會刪除. autoDelete: true, //如果所有消費者都斷開連接了,是否自動刪除.如果還沒有消費者從該隊列獲取過消息或者監聽該隊列,那么該隊列不會刪除.只有在有消費者從該隊列獲取過消息后,該隊列才有可能自動刪除(當所有消費者都斷開連接,不管消息是否獲取完) arguments: null //隊列的配置 );
對於第5個參數: arguments ,
它的類型是一個鍵值對集合 :

它到底有哪些key呢?
我們可以通過 RabbitMQ 的管理頁面看到:

一共10個:
- Message TTL : 消息生存期
- Auto expire : 隊列生存期
- Max length : 隊列可以容納的消息的最大條數
- Max length bytes : 隊列可以容納的消息的最大字節數
- Overflow behaviour : 隊列中的消息溢出后如何處理
- Dead letter exchange : 溢出的消息需要發送到綁定該死信交換機的隊列
- Dead letter routing key : 溢出的消息需要發送到綁定該死信交換機,並且路由鍵匹配的隊列
- Maximum priority : 最大優先級
- Lazy mode : 懶人模式
- Master locator :
Message TTL
官方 : How long a message published to a queue can live before it is discarded (milliseconds). (Sets the "x-message-ttl" argument.)
翻譯 : 一個隊列中的消息,在被丟棄之前能夠存活多少毫秒.( key 為 "x-message-ttl").通俗講就是,隊列中的消息的生存周期,單位毫秒.
測試 :
internal class Program { private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { { "x-message-ttl", 10000 } }; Producer.Send(arguments); Console.ReadKey(); } } public class Producer { private const string QueueName = "test_queue"; public static void Send(IDictionary<string,object> arguments) { using (IConnection connection = ConnectionHelper.GetConnection()) using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, false, false, false, arguments); string msg = "hello world "; channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}"); } } }
我們從管理頁面該隊列的消息概述中可以看出,這條消息只存活了10秒鍾.

為了加以對比,我還另外創建了一個沒有聲明任何參數的隊列:

可以看出,測試的這條隊列的特征(Features)一欄中,被打上了"TTL"標簽.
這里有個非常重要的知識點需要注意!
隊列一旦聲明,參數將無法更改,添加,刪除,也就是說,對上述"test_queue"隊列進行如下操作都會拋出異常:
- 修改該隊列的 "x-message-ttl" 參數為 "20000" 毫秒 :
Dictionary<string, object> arguments = new Dictionary<string, object> { { "x-message-ttl", 20000 } };
- 試圖通過傳入 null 來刪除該隊列的所有已設置的參數 :
Dictionary<string, object> arguments = null;
- 試圖添加新的參數 :
Dictionary<string, object> arguments = new Dictionary<string, object> { { "x-message-ttl", 10000 } }; arguments.Add("x-expires", 12345);
通通不行!!!
要改變一個隊列的參數,只有兩種辦法:
- 刪除該隊列,重新創建;
- 換個名字,創建一個新的隊列.
Auto expire
官方 : How long a queue can be unused for before it is automatically deleted (milliseconds).(Sets the "x-expires" argument.)
翻譯 : 隊列多長時間(毫秒)沒有被使用(訪問)就會被刪除.換個說法就是,當隊列在指定的時間內沒有被使用(訪問)就會被刪除.
測試 :
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-message-ttl", 10000},//設置隊列中的消息存活期為 10 秒 { "x-expires", 20000}//設置隊列的存活期 20 秒 }; Producer.Send(arguments); Console.ReadKey(); }
可以看到,隊列特征一欄中,多了一個"Exp"標簽

當然,10秒后,消息會被刪除,20秒后,隊列被刪除
但是,如果我們同時創建一個消費者,監聽該隊列,如下:
public class Consumer { private const string QueueName = "test_queue"; public static void Receive() { IConnection connection = ConnectionHelper.GetConnection(); IModel channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (s, e) => { byte[] bytes = e.Body; string str = Encoding.Default.GetString(bytes); Console.WriteLine("consumer receive : " + str); }; channel.BasicConsume(QueueName, true, "", false, false, null, consumer); } }
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-message-ttl", 10000},//設置隊列中的消息存活期為 10 秒 { "x-expires", 20000}//設置隊列的存活期 20 秒 }; Producer.Send(arguments); Consumer.Receive();//創建一個消費者監聽該隊列 Console.ReadKey(); }
那該隊列永遠不會被刪除.因為雖然它里面沒有消息,但一直有消費者在使用(訪問)它,所以它不會被刪除.
Max length
官方 : How many (ready) messages a queue can contain before it starts to drop them from its head.(Sets the "x-max-length" argument.)
翻譯 : 隊列可以容納的消息的最大條數,超過這個條數,隊列頭部的消息將會被丟棄.
測試 : 我們設置隊列最多只能容納 1 條消息,然后一次性發送10條,發送完畢后,創建一個消費者去消費,部分代碼如下:
生產者
for (int i = 0; i < 10; i++) { string msg = "hello world " + i; channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}"); }
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-message-ttl", 10000},//設置隊列中的消息存活期為 10 秒 { "x-expires", 20000},//設置隊列的存活期 20 秒 {"x-max-length",1 },//設置隊列中的消息的最大條數為 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則. }; Producer.Send(arguments); Thread.Sleep(1000); Consumer.Receive();//創建一個消費者監聽該隊列 Console.ReadKey(); }
運行結果:

可以看到,消費者消費到的是"hello world 9", "hello world 0" - "hello world 8"都被隊列丟棄了.
標簽如下:

Max length bytes
官方 : Total body size for ready messages a queue can contain before it starts to drop them from its head.(Sets the "x-max-length-bytes" argument.)
翻譯 : 隊列可以容納的消息的最大字節數,超過這個字節數,隊列頭部的消息將會被丟棄.
測試 : 我們首先設置隊列最多只能容納12個字節.
Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-message-ttl", 10000},//設置隊列中的消息存活期為 10 秒 { "x-expires", 20000},//設置隊列的存活期 20 秒 {"x-max-length-bytes",12 },//設置隊列中的消息的最大字節數 };
接下來要分兩種情況了:
一.發送一條 15個字節的消息:"新年快樂啊"(我們采用UTF8編碼,1個漢字占3個字節),發送完畢后,創建一個消費者去消費.(當然,消費者也要用UTF8來接收)
string msg = "新年快樂啊"; channel.BasicPublish("", QueueName, null, Encoding.UTF8.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}");

可以看到,消費者並沒有收到消息.說明整條消息都被丟棄了,而不是想象中的只丟棄"新年快樂" ,剩個"啊".
二.連續發送2條累計15個字節的消息.
channel.BasicPublish("", QueueName, null, Encoding.UTF8.GetBytes("新年快樂")); channel.BasicPublish("", QueueName, null, Encoding.UTF8.GetBytes("啊")); Console.WriteLine($"{DateTime.Now} : 消息發送完畢 ");

可以看到, 消費者收到了"啊".
隊列標簽如下 :

Overflow behaviour
官方 : Sets the queue overflow behaviour. This determines what happens to messages when the maximum length of a queue is reached. Valid values are drop-head or reject-publish.
翻譯 : 隊列中的消息溢出時,如何處理這些消息.要么丟棄隊列頭部的消息,要么拒絕接收后面生產者發送過來的所有消息.( 從上面兩個參數的測試中可以看出,"drop-head" 應該是默認行為) ,官方只給了 value,沒給 key . key 為 "x-overflow".
測試 : 沿用 Max length 參數解釋中的示例:
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-max-length",1 },//設置隊列中的消息的最大條數為 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則. {"x-overflow","reject-publish" },//設置隊列中的消息溢出后,該隊列的行為:"拒絕接收"(所有消息) }; Producer.Send(arguments); Thread.Sleep(1000); Consumer.Receive();//創建一個消費者監聽該隊列 Console.ReadKey(); }
運行結果:

可以看到,這次消費者消費的是" hello world 0" ,不再是 "hello world 9" 了,因為生產者發送的"hello world 1" - "hello world 9"被隊列拒絕了.
標簽如下:

Dead letter exchange
官方 : Optional name of an exchange to which messages will be republished if they are rejected or expire.(Sets the "x-dead-letter-exchange" argument.)
翻譯 : 該參數值為一個(死信)交換機的名稱,當隊列中的消息的生存期到了,或者因長度限制被丟棄時,消息會被推送到(綁定到)這台交換機(的隊列中),而不是直接丟掉.
對於這個參數,有兩點需要特別注意:
一.前面的文章中提到過:

所以,在測試前,我們需要創建一個隊列,並綁定到該交換機.當然,交換機也需要提前創建好.
為了方便,我們在管理頁面創建交換機和隊列,並完成"綁定"動作.
創建交換機
1.先選擇"Exchanges"標簽
2.點擊"Add a new exchange"標簽.

這里又要注意了,交換機類型我們要選擇"fanout"模式.這種模式下,交換機會將生產者發送的消息分發到所有綁定到它的隊列.細心的朋友應該能看出來為什么.
因為這個參數只是傳入了交換機的名稱,沒有傳入"Routing Key".
創建隊列
創建一個新隊列 : "test_dead_letter_queue" ,圖就不上了,開篇就已經提到了.
綁定交換機
有兩種方式:
1.在交換機詳情頁面輸入要綁定的隊列.

2.在隊列詳情頁面輸入要綁定的交換機.

上述工作都完成后,我們開始正式測試該參數.
我們設置隊列可以容納的消息最多1條,多的拒絕接收.
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-max-length",1 },//設置隊列中的消息的最大條數為 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則. {"x-overflow","reject-publish" },//設置隊列中的消息溢出后,隊列的行為:"拒絕接收"(任何消息) {"x-dead-letter-exchange","test_dead_letter_exchange" }, }; Producer.Send(arguments); Thread.Sleep(1000); Consumer.Receive(); Console.ReadKey(); }
生產者的代碼我們沿用 Max length 參數解釋中的代碼.
運行后,我們去看后台.

咦!?"test_dead_letter_queue"隊列怎么一條消息都沒有呢?
為什么交換機沒有把 "test_queue" 丟棄的9條消息推送到該隊列呢?
這是什么情況?
這就是這個參數第2個需要注意的點了.
二.我們再讀一次官方對該參數的解釋:
Optional name of an exchange to which messages will be republished if they are rejected or expire.
個人覺得,官方這里用 rejected 不是太准確,當然,也可能是我理解得還不夠深入,再加上英語太差.
為什么這么說呢?
我最開始的理解是 : 被拒絕或到期的消息會被推送到新的交換機,所以我在參數中傳入了 {"x-overflow","reject-publish" }
我認為隊列拒絕的這9條消息,就應該被交換機推送到綁定到它的隊列去!
實際上,官方用詞 "rejected" 應該被翻譯成 : "丟棄"或者"拋棄",而不是"拒絕",也就是說,這里的 rejected 和 expire 是隊列里面的消息的狀態.而不是隊列的動作.
當我們注釋掉 {"x-overflow","reject-publish" } ,改用默認的 "drop-head" 行為時,運行一切正常:

關於到期( expire )的測試,這里就不上圖了.
這個參數的標簽是 : DLX
Dead letter routing key
官方 : Optional replacement routing key to use when a message is dead-lettered. If this is not set, the message's original routing key will be used.(Sets the "x-dead-letter-routing-key" argument.)
翻譯 : 略......
測試 :
我們先刪除上述 "Dead letter exchange" 參數解釋中創建的交換機及隊列.
然后重新創建交換機 "test_dead_letter_exchange",並將其類型設置為"direct".(很多帖子叫它"直連模式"或者"路由模式",叫"topic"為"主題模式",但我更新喜歡叫"direct"為"精確匹配模式",叫"topic"為"模糊匹配模式",感覺好理解一些)
接着重新創建兩個隊列 "test_dead_letter_queue1" 和 "test_dead_letter_queue2",並將它們綁定到新的交換機.綁定的時候,將隊列"test_dead_letter_queue1"的路由鍵設置為"test".

private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-max-length",1 },//設置隊列中的消息的最大條數為 1 條,超過1條,則遵循隊列的"先進先出(丟)"原則. {"x-dead-letter-exchange","test_dead_letter_exchange" }, {"x-dead-letter-routing-key","test" }, }; Producer.Send(arguments); Console.ReadKey(); } }
運行結果及標簽:

Maximum priority
官方 : Maximum number of priority levels for the queue to support; if not set, the queue will not support message priorities.(Sets the "x-max-priority" argument.)
翻譯 : 設置該隊列中的消息的優先級最大值.發布消息的時候,可以指定消息的優先級,優先級高的先被消費.如果沒有設置該參數,那么該隊列不支持消息優先級功能.也就是說,就算發布消息的時候傳入了優先級的值,也不會起什么作用.
測試 :
生產者
public class Producer { private const string QueueName = "test_queue"; public static void Send(IDictionary<string, object> arguments) { using (IConnection connection = ConnectionHelper.GetConnection()) using (IModel channel = connection.CreateModel()) {
var pros = channel.CreateBasicProperties();//構造消息的屬性 channel.QueueDeclare(QueueName, false, false, false, arguments); for (byte i = 0; i < 10; i++) { string msg = "hello world " + i; pros.Priority = i; channel.BasicPublish("", QueueName, pros, Encoding.Default.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}"); } } } }
上述代碼表明,"hello world 0-9" 的優先級為 9 - 0.
需要注意的是,優先級屬性的類型為 byte.
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-max-priority",255 }, }; Producer.Send(arguments); Thread.Sleep(1000); Consumer.Receive();//創建一個消費者監聽該隊列 Console.ReadKey(); }
運行結果:

問題又來了,如果聲明隊列時,優先級最大值設置的是 5 ,那么這10條消息的消費順序應該是怎樣的呢?我們直接看結果:

這個有點意思......
該參數的標簽為 : Pri
Lazy mode
官方 : Set the queue into lazy mode, keeping as many messages as possible on disk to reduce RAM usage; if not set, the queue will keep an in-memory cache to deliver messages as fast as possible.(Sets the "x-queue-mode" argument.)
翻譯 : 設置隊列為懶人模式.該模式下的隊列會先將交換機推送過來的消息(盡可能多的)保存在磁盤上,以減少內存的占用.當消費者開始消費的時候才加載到內存中;如果沒有設置懶人模式,隊列則會直接利用內存緩存,以最快的速度傳遞消息.
測試 : 使用簡單隊列公平分發測試.
生產者
public class Producer { private const string QueueName = "test_queue"; public static void Send(IDictionary<string, object> arguments) { using (IConnection connection = ConnectionHelper.GetConnection()) using (IModel channel = connection.CreateModel()) { channel.QueueDeclare(QueueName, false, false, false, arguments); channel.BasicQos(0, 1, false); for (byte i = 0; i < 10; i++) { string msg = "hello world " + i; channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"{DateTime.Now} : send {msg}"); } } } }
消費者
public class Consumer { private const string QueueName = "test_queue"; public static void Receive() { IConnection connection = ConnectionHelper.GetConnection(); IModel channel = connection.CreateModel(); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (s, e) => { byte[] bytes = e.Body; string str = Encoding.Default.GetString(bytes); Console.WriteLine("consumer receive : " + str); channel.BasicAck(e.DeliveryTag, false); Thread.Sleep(5000);//5秒確認一條消息 }; channel.BasicConsume(QueueName, false, "", false, false, null, consumer); } }
控制台
private static void Main(string[] args) { Dictionary<string, object> arguments = new Dictionary<string, object> { {"x-queue-mode","lazy" }, }; Producer.Send(arguments); Thread.Sleep(10000); Consumer.Receive();//生產者消息發送完畢10秒后再創建消費者 Console.ReadKey(); }
運行結果:
當10條消息發送完畢后,我們看管理后台的隊列詳情:

可以非常清楚看到,內存中是沒有任何消息的.總數和已准備都是130B.
130B怎么來的? "hello world 0" 一共13個字節,一共10條,13*10=130.
10秒后,消費者創建,並開始消費.

參數的標簽為 : Args (這個標簽有點特別...)
其實關於懶人模式和默認模式還有很多細節,各自的優缺點等.比如上面的例子,我故意讓生產者一次性發了10條消息到隊列,並且隊列每次只傳遞一條到消費者,但是消費者開始消費的時候,隊列就一次性把10條消息都讀取到了內存.
再比如,持久化的消息與非持久化的消息,結合懶人模式等等.
還有默認模式和懶人模式的效率,性能比較等.
估計需要單獨寫個帖子了.
終於寫到最后一個參數了
Master locator
官方 : Set the queue into master location mode, determining the rule by which the queue master is located when declared on a cluster of nodes.(Sets the "x-queue-master-locator" argument.)
集群相關設置,暫時放一邊去!
