1.隊列命名規范
{項目名稱}.{模塊名稱}.{其他}
如:KuxueDemo.Common.User
2.選擇合適的消息模型
TIPS:以下范例均使用 RabbitMQ.Client 5.1.2
- Simple queue(簡單隊列)
消息的生產者負責生產消息,消費者負責處理消息,在下圖中,"P"是我們的生產者,"C"是我們的消費者。中間的框是一個隊列-RabbitMQ代表使用者保留的消息緩沖區。

消息的發送:
public static void StartBasicPublish() { var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672/") }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
消息接收:
public static void StartConsumer() { var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672/") }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
- Work queue(工作隊列)

工作隊列的主要思想是避免在執行密集型任務而不得不等待所有得任務完成。我們安排任務在以后完成,我們將任務封裝為消息並將其發送到隊列,后台運行得工作進程將接收任務消息並最終執行任務作業。當運行許多任務消費者時,任務將在他們之間共享。這個概念在Web應用程序中特別有用,因為在Web應用程序中,不可能在較短的HTTP請求內處理復雜的任務。
工作隊列與簡單隊列不同的是,我們啟動了多個消費者去處理消息,這種消息模式是用得最多的一種模型,這里就不再放范例代碼了。
默認情況下,RabbitMQ將每個消息一次發送給下一個消費者,平均而言,每個消費者都會收到相同數量的消息。
消息確認
如果其中一個消費者者開始一項漫長的任務並僅部分完成而死掉,會發生什么。使用我們當前的代碼,RabbitMQ一旦向消費者發送了一條消息,便立即將其標記為刪除。在這種情況下,如果您殺死一個消費者,我們將丟失正在處理的消息,我們還將丟失所有發送給該特定消費者但尚未處理的消息。
但是我們不想丟失任何任務,如果一個消費者死亡,我們希望將任務交付給另一個消費者。
為了確保消息永不丟失,RabbitMQ支持"消息確認"。消費者發送回一個確認(告知),告知RabbitMQ特定的消息已被接收,處理,並且RabbitMQ可以自由刪除它。
如果消費者死了(其通道已關閉,連接已關閉或TCP連接丟失)而沒有發送確認,RabbitMQ將了解消息未完全處理,並將重新排隊。如果同時有其他消費者在線,它將很快將其重新分發給另一個消費者。這樣,您可以確保即使工人偶爾死亡也不會丟失任何消息。
沒有任何消息超時;消費者死亡時,RabbitMQ將重新傳遞消息。即使處理一條消息花費非常非常長的時間也沒關系。
默認情況下,手動消息確認處於打開狀態。在前面的示例中,我們通過將autoAck(“自動確認模式”)參數設置為true來明確關閉它們。完成任務后,是時候刪除此標志並手動從工作程序發送適當的確認了。
public static void StartConsumer() { var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672/") }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "hello", durable: false, exclusive: false, autoDelete: false, arguments: null); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); Console.WriteLine(" [x] Received {0}", message); }; channel.BasicConsume(queue: "hello", autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
使用此代碼,我們可以確保,即使您在處理消息時使用CTRL + C殺死工作人員,也不會丟失任何信息。工人死亡后不久,所有未確認的消息將重新發送。
TIPS:使用手動確認模式時,一定不要忘記確認。
失敗隊列
有時候消費者處理消息時不可避免會出錯,為了確保每個消息能夠被處理,我們可以把錯誤的任務重新發送給一個專用來處理錯誤的隊列,這樣可以不影響消息處理的速度,也能在一定程度上保證每個消息都能盡可能的被正確處理。
隊列消息持久化
手動確認可以確保即使消費者死亡,任務也不會丟失。但是,如果RabbitMQ服務器停止,我們的任務仍然會丟失。
RabbitMQ退出或崩潰時,它將忘記隊列和消息,除非您告知不要這樣做。要確保消息不會丟失,需要做兩件事:我們需要將隊列和消息都標記為持久。
首先,我們需要確保RabbitMQ永遠不會丟失我們的隊列。為此,我們需要將其聲明為持久的(durable: true),
channel.QueueDeclare(queue: "hello", durable: true, exclusive: false, autoDelete: false, arguments: null);
為了確保重新啟動后,消息也不會丟失,還需要設置消息的持久化(Persistent=true),
var properties = channel.CreateBasicProperties(); properties.Persistent = true;
將消息標記為持久性並不能完全保證不會丟失消息。盡管它告訴RabbitMQ將消息保存到磁盤,但是RabbitMQ接受消息並且尚未保存消息時,還有很短的時間。而且,RabbitMQ不會對每條消息都執行異步保存,它可能只是保存到緩存中,而沒有真正寫入磁盤。持久性保證並不強,但是對於我們的簡單任務隊列而言,這已經綽綽有余了。如果您需要更強有力的保證,則可以使用 發布者確認。
任務公平分配
在有兩名消費者的情況下,當有的消息都很處理很復雜,有的消息處理很輕松時,就有可能出現一個消費者一直忙碌而另一個消費者幾乎不做任何工作。RabbitMQ對此一無所知,仍平均分配消息。
發生這種情況是因為RabbitMQ在消息進入隊列時才調度消息。它不會查看使用者的未確認消息數。它只是盲目地將每第n條消息發送給第n個使用者。
以下設置告訴RabbitMQ一次不要給消費者一個以上的消息,也就是說在消費者確認上一條消息完成之前,不會再將新的消息發送給消費者,而是將其分派給下一個空閑的消費者。
channel.BasicQos(0, 1, false);
如果所有的消費者都忙,隊列消息還在不停增加的情況下,還需要留意隊列消息達到上限。
- Publish/Subscribe(發布/訂閱模式)

此模式用日志系統為例說明,在我們的日志系統中,接收器程序的每個運行副本都將獲得消息。這樣,我們將能夠運行一個接收器並將日志定向到磁盤。同時我們將能夠運行另一個接收器並在屏幕上查看日志。本質上,已經發布的消息將被廣播到所有的接收者。RabbitMQ消息傳遞模型中的核心思想是生產者從不將任何消息直接發送到隊列,生產者甚至根本不知道是否將消息傳遞到任何隊列。相反,生產者只能將消息發送到交換機。一方面,交換機接收來自生產者的消息,另一方面,將它們推入隊列。交換機必須確切知道如何處理收到的消息,是否應將其附加到特定隊列,是否應該將其附加到許多隊列中,還是應該丟棄它。如何處理消息由exchange type定義。
exchange type有以下幾種類型:direct, topic, headers ,fanout. 這里日志所需要的就是 fanout 類型,定義如下:
string message = "Hello World!"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body);
這里的日志消費者,只能處理當前正在發送的消息,而無法處理已經發布的舊消息。無論何時,連接到Rabbit后,自動創建一個新的隊列,默認是隨機的名稱;一旦我們斷開連接,隊列將會被自動移除。
發送者代碼示例:
public static void Publish(string[] args) { var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672/") }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare("logs", ExchangeType.Fanout); string input = ""; while (true) { input = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(input); channel.BasicPublish(exchange: "logs", routingKey: "", basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", input); if (input == "exit") { break; } } } } Console.WriteLine(" Press 'exit' to exit."); Console.ReadLine(); }
接受者代碼示例:
public static void Consume() { var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672/") }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare("logs", ExchangeType.Fanout, durable: true); var queueName = channel.QueueDeclare().QueueName; Console.WriteLine("Queue name {0}", queueName); channel.QueueBind(queue: queueName, exchange: "logs", routingKey: ""); Console.WriteLine(" [*] Waiting for logs."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
- Routing(路由模式)
廣播模式算是錄有模式的簡化版,被綁定的隊列無差別接受廣播的任何消息,那么路由模式可以特定的接受某些類型的消息。例如:我們將只能將嚴重錯誤消息定向到日志文件(以節省磁盤空間),同時仍然能夠在控制台上打印所有日志消息。
Bindings 綁定
綁定時 exchange 和 隊列之間的關系。可以簡單的理解為:隊列對來自此exchange的消息感興趣。
channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: "black");
直接交換
廣播模式可以將所有消息廣播給所有使用者,我們想要擴展它以允許根據消息的嚴重性過濾消息。例如,我們可能希望將日志消息寫入磁盤的腳本僅接收嚴重錯誤,而不會在警告或信息日志消息上浪費磁盤空間。
使用 fanout 類型,這種類型並沒有給我們帶來太大的靈活性,它只能進行無意識的廣播。
使用 direct 類型,這種類型背后的算法很簡單,消息進入其 binding key 和消息的 routingKey完全匹配的隊列。

上圖設置中,我們看到綁定了兩個隊列的 direct exchange x,第一個隊列綁定為orange, 第二個綁定key為兩個:black, green.
使用routingKey為 orange 發布到 exchange 的消息將會路由到隊列 Q1, routingKey 為 black 或者 green 將轉到 Q2,所有其他消息將會被丟棄。
多重綁定

用相同的綁定密鑰綁定多個隊列是完全合法的。在我們的示例中,我們可以使用綁定鍵 black 在 X 和 Q1 之間添加綁定。在這種情況下,direct 交換類型將類似於 fanout,並將消息廣播到所有匹配的隊列。帶有黑色路由鍵的消息將同時傳遞給 Q1和Q2。
生產者代碼示例:
public static void Publish(string[] args) { var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672/") }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct); var severity = (args.Length > 0) ? args[0] : "info"; var message = (args.Length > 1) ? string.Join(" ", args.Skip(1).ToArray()) : "Hello world"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "direct_logs", routingKey: severity, basicProperties: null, body: body); Console.WriteLine(" [x] Sent {0}", message); } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); }
消費者代碼示例:
public static void Consume(string[] args) { var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672/") }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct); var queueName = channel.QueueDeclare().QueueName; foreach (var severity in args) { channel.QueueBind(queue: queueName, exchange: "direct_logs", routingKey: severity); } Console.WriteLine(" [*] Waiting for messages."); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
- Topic(主題模式)
主題模式中,發給 topic exchange 的消息的routingKey可以時多個關鍵字,關鍵字以“.”號分隔,這些關鍵字最好與消息的某些功能息息相關。例如: "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit". routingKey可以包含任何單詞,最多255個字節。
binding key 也必須是相同的格式,topic exchange 和 direct exchange 的消息匹配模式相似:特定的 routing key 發送的消息將傳遞到所有能匹配的 binding key 的隊列,topic exchange 有兩個特殊的匹配語法:
a. * 號代替一個單詞
b. # 號代替零個或者多個單詞

主題模式在特定的匹配規則下可以與 fanout 和 direct 兩種模式一樣使用:
a. 隊列 binding key 設置為 # 號時,就與 fanout 模式一樣可以接收 exchange 的所有消息了
b. 隊列 binding key 不使用 # 和 * 號模糊關鍵字時,配置特定的關鍵字,就和 direct 模式一樣,接收特定的 routing key 的消息。
- RPC(請求/回復模式)
前面幾種消息模型處理消息時,消息制造者並不需要知道消息結果,適用於處理一些比較消耗資源(時間、硬件等)的任務。當遠程服務處理完消息並返回結果時,可以使用 RPC 模式。
盡管 RPC 模式使用比較普遍,但是也不能濫用 RPC,使用之前應該明確以下內容:
a. 明確的知道哪些函數時本地調用,哪些函數時遠程調用
b. 系統設計文檔里面清除表示各個組件之間的依賴關系
c. 做好遠程服務如果長時間宕機時的錯誤處理
如果對以上幾點不是很明確,盡可能的使用異步調用代替 RPC

RPC 的工作過程:
a. 客戶端啟動時,創建一個匿名的唯一隊列
b. 對於 RPC 請求,客戶端發送一條帶有兩個屬性的消息: ReplyTo(設置為回調隊列) 和 CorrelationId ( 對每個請求設置為唯一的值)
c. 請求被發送到 rpc_queue 隊列
d. RPC 工作程序(服務器)等待該隊列上的請求,出現請求時,它執行工作,並將包含結果的消息發送給 ReplyTo 屬性聲明的隊列。
f. 客戶端等待回調隊列上的數據。出現消息時,如果CorrelationId屬性與請求中的值匹配,則將消息返回給應用程序。
服務端:
public static void Server() { var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672/") }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare(queue: "rpc_queue", durable: false, exclusive: false, autoDelete: false, arguments: null); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); channel.BasicConsume(queue: "rpc_queue", autoAck: false, consumer: consumer); Console.WriteLine(" [x] Awaiting RPC requests"); consumer.Received += (model, ea) => { string response = null; var body = ea.Body; var props = ea.BasicProperties; var replyProps = channel.CreateBasicProperties(); replyProps.CorrelationId = props.CorrelationId; try { var message = Encoding.UTF8.GetString(body); int n = int.Parse(message); Console.WriteLine(" [.] fib({0})", message); response = fib(n).ToString(); Thread.Sleep(TimeSpan.FromSeconds(10)); } catch (Exception e) { Console.WriteLine(" [.] " + e.Message); response = ""; } finally { var responseBytes = Encoding.UTF8.GetBytes(response); channel.BasicPublish(exchange: "", routingKey: props.ReplyTo, basicProperties: replyProps, body: responseBytes); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); } }; Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } }
客戶端 RpcClient:
namespace RabbitMqDemo { using System; using System.Collections.Concurrent; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; public class RpcClient { private readonly IConnection connection; private readonly IModel channel; private readonly string replyQueueName; private readonly EventingBasicConsumer consumer; private readonly BlockingCollection<string> respQueue = new BlockingCollection<string>(); private readonly IBasicProperties props; public RpcClient() { var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672/") }; connection = factory.CreateConnection(); channel = connection.CreateModel(); replyQueueName = channel.QueueDeclare().QueueName; consumer = new EventingBasicConsumer(channel); props = channel.CreateBasicProperties(); var correlationId = Guid.NewGuid().ToString(); props.CorrelationId = correlationId; props.ReplyTo = replyQueueName; consumer.Received += (model, ea) => { var body = ea.Body; var response = Encoding.UTF8.GetString(body); if (ea.BasicProperties.CorrelationId == correlationId) { respQueue.Add(response); } }; } public string Call(string message) { var messageBytes = Encoding.UTF8.GetBytes(message); channel.BasicPublish( exchange: "", routingKey: "rpc_queue", basicProperties: props, body: messageBytes); channel.BasicConsume( consumer: consumer, queue: replyQueueName, autoAck: true); return respQueue.Take(); } public void Close() { channel.Close(); connection.Close(); } } }
客戶端啟動:
static void Main(string[] args) { var rpcClient = new RpcClient(); Console.WriteLine(" [x] Requesting fib(30)"); var response = rpcClient.Call("30"); Console.WriteLine(" [.] Got '{0}'", response); rpcClient.Close(); Console.ReadLine(); }
相對來看,RPC 模式更像是把簡單隊列的一種組合使用。
Publisher Confirms(發布者確認)
發布者確認對是AMQP 0.9.1 協議的擴展,默認情況下未啟用。發布者確認通過ConfirmSelect 方法在 channel 級別啟用。
var channel = connection.CreateModel(); channel.ConfirmSelect();
這個方法必須在每個 channel 創建時調用。發布者確認默認只需要開啟一次就可以了,並不需要每次發布消息都啟動。
Strategy # 1: 單個消息單獨確認
byte[] body = ...; BasicProperties properties = ...; channel.BasicPublish(exchange, queue, properties, body); // uses a 5 second timeout channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5));
像往常一樣發布一條消息,並等待通過 Channel.WaitForConfirmsOrDie(TimeSpan) 方法進行確認。確認消息后,該方法將立即返回。如果未在超時時間內確認該消息或該消息沒有被確認(這意味着代理出於某種原因無法處理該消息),則該方法將引發異常。異常的處理通常包括記錄錯誤消息和/或重試發送消息。
此技術非常簡單,但也有一個主要缺點:由於消息的確認會阻止所有后續消息的發布,因此它會大大降低發布速度。這種方法不會提供每秒超過數百條已發布消息的吞吐量。但是,對於某些應用程序來說這可能已經足夠了。
Strategy #2:批量確認消息
為了改進前面的示例,我們可以發布一批消息,並等待整個批次被確認。以下示例使用了100個批次:
var batchSize = 100; var outstandingMessageCount = 0; while (ThereAreMessagesToPublish()) { byte[] body = ...; BasicProperties properties = ...; channel.BasicPublish(exchange, queue, properties, body); outstandingMessageCount++; if (outstandingMessageCount == batchSize) { channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5)); outstandingMessageCount = 0; } } if (outstandingMessageCount > 0) { channel.WaitForConfirmsOrDie(new TimeSpan(0, 0, 5)); }
與等待確認單個消息相比,等待一批消息被確認可以極大地提高吞吐量(對於遠程RabbitMQ節點,這最多可以達到20-30次)。一個缺點是我們不知道發生故障時到底出了什么問題,因此我們可能必須將整個批處理保存在內存中,以記錄有意義的內容或重新發布消息。而且該解決方案仍然是同步的,因此它阻止了消息的發布。
Strategy #3: 異步確認
代理異步確認已發布的消息,只需在客戶端上注冊一個回調即可收到這些確認的通知:
var channel = connection.CreateModel(); channel.ConfirmSelect(); channel.BasicAcks += (sender, ea) => { // code when message is confirmed }; channel.BasicNacks += (sender, ea) => { //code when message is nack-ed };
摘要
某些應用程序中,確保將發布的消息送到 exchange 非常重要,需要根據實際情況選擇如何處理:
a. 逐條消息確認,同步等待確認:簡單,單吞吐量非常有限。
b.批量消息確認,同步等待批量確認:簡單,合理的吞吐量,但是很難推斷出什么時候除了問題
c.異步處理:最佳性能和資源使用,在出現錯誤的情況下可以很好的空值
3.使用建議
使用連接字符串代替多項配置
雖然單獨配置用戶名,密碼,連接實例名稱等也是可以的,但是為了減少配置項的數量,建議使用 uri 的方式直接創建連接。
connection 和 channel 都是需要釋放的資源
應用程序反復打開連接而不關閉它們,最終將導致節點資源枯竭。RabbitMQ的管理界面提供了打開連接數的圖表,建議程序在壓力測試過程中,關注一下連接數。
以下是建議的創建connection 和 channel 的方式
var factory = new ConnectionFactory() { Uri = new Uri("amqp://guest:guest@localhost:5672/") }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //... } }
建議代碼創建隊列,而不是手動創建
無論是創建 queue,還是創建 exchange ,或者創建綁定關系,建議使用用代碼創建,代碼里RabbitMQ聲明創建 exchange, queue 等是冪等的(不存在才創建),減少手動創建帶來不必要的錯誤。需要注意的是,聲明創建隊列時,同名的隊列配置不同會拋出異常。例如:已經存在名稱 hello 的隊列,配置 durable=true,再聲明創建的 hello 的隊列使用默認值不進行持久化,就會拋出異常。
謹慎使用發布者確認
理論上來說,確認消息能到達 exchange 能提高消息處理效率,如果需要確認,請選擇異步確認。即使項目實際要求對消息處理不是那么嚴格,建議聲明 BasicNacks 來使用日志記錄失敗的消息。
隊列持久化保證
系統在使用隊列時,就要考慮服務器重啟的情況,非自動創建的queue和exchange,建議聲明為持久化,消息持久化根據實際需求選擇。
