先決條件
本教程假定 RabbitMQ 已經安裝,並運行在localhost
標准端口(5672)。如果你使用不同的主機、端口或證書,則需要調整連接設置。從哪里獲得幫助
如果您在閱讀本教程時遇到困難,可以通過郵件列表 聯系我們。
發布/訂閱
(使用 .NET Client)
在 教程[2] 中,我們創建了一個工作隊列,假設在工作隊列中的每一個任務都只被分發給一個 Worker。那么在這一章節,我們要做與之完全不同的事,那就是我們將要把一條消息分發給多個消費者。這種模式被稱為“發布/訂閱”。
為了說明、體現這種模式,我們將會建一個簡單的日志系統。它將會包含兩個程序 - 第一個用來發送日志消息,第二個用來接收並打印它們。
在我們建立的日志系統中,每個接收程序的運行副本都會收到消息。這樣我們就可以運行一個接收程序接收消息並將日志寫入磁盤;同時運行另外一個接收程序接收消息並將日志打印到屏幕上。
實質上,發布的日志消息將會被廣播給所有的接收者。
交換器
在教程的前幾部分,我們是發送消息到隊列並從隊列中接收消息。現在是時候介紹 Rabbit 中完整的消息傳遞模型了。
讓我們快速回顧一下前面教程中的內容:
- 生產者是發送消息的用戶應用程序。
- 隊列是存儲消息的緩沖區。
- 消費者是接收消息的用戶應用程序。
在 RabbitMQ 中,消息傳遞模型的核心理念是生產者從來不會把任何消息直接發送到隊列,其實,通常生產者甚至不知道消息是否會被分發到任何隊列中。
然而,生產者只能把消息發送給交換器。交換器非常簡單,一方面它接收來自生產者的消息,另一方面又會把接收的消息推送到隊列中。交換器必須明確知道該如何處理收到的消息,應該追加到一個特定隊列中?還是應該追加到多個隊列中?或者應該把它丟棄?這些規則都被定義在交換器類型中。
目前交換器類型有這幾種:direct
,topic
,headers
和fanout
。我們先重點關注最后一個fanout
,我們創建一個這種類型的交換器,將其命名為logs
:
channel.ExchangeDeclare("logs", "fanout");
fanout
類型交換器非常簡單,正如您可能從名字中猜出的那樣,它會把收到的所有消息廣播到它已知的所有隊列中。這恰巧是我們的日志系統目前所需要的。
列舉交換器
要列舉出服務器上的交換器,您可以使用非常有用的rabbitmqctl
命令行工具:sudo rabbitmqctl list_exchanges
執行上述命令后,出現的列表中將會有一些
amq.*
交換器和默認(未命名)交換器。這些是默認創建的,不過目前您可能用不到它們。默認交換器
在教程的前些部分,我們對交換器這一概念還一無所知,但仍然可以把消息發送到隊列。之所以這樣,是因為我們使用了一個用空字符串(""
)標識的默認交換器。回顧一下我們之前如何發布消息:
var message = GetMessage(args); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange: "", routingKey: "hello", basicProperties: null, body: body);
第一個參數就是交換器的名稱,空字符串表示默認或匿名交換器:將消息路由到
routingKey
指定的隊列(如果存在)中。
現在,我們可以把消息發布到我們指定的交換器:
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
臨時隊列
您是否還記得之前我們使用過的隊列,它們都有一個特定的名稱(記得應該是hello
和task_queue
吧)。給隊列命名對我們來說是至關重要的 -- 因為我們可能需要多個 Worker 指向同一個隊列;當您想要在生產者和消費者之間共享隊列時,給隊列一個名稱也是非常重要的。
但是,我們創建的日志系統並不希望如此。我們希望監聽所有的日志消息,而不僅僅是其中一部分。我們也只對目前流動的消息感興趣,而不是舊消息。為解決這個問題,我們需要做好兩件事。
首先,我們無論何時連接 Rabbit,都需要一個新的、空的隊列。要做到這一點,我們可以使用隨機名稱來創建隊列,或許,甚至更好的方案是讓服務器為我們選擇一個隨機隊列名稱。
其次,一旦我們與消費者斷開連接,與之相關的隊列應該被自動刪除。
在 .NET 客戶端中,如果不向QueueDeclare()
方法提供任何參數,實際上就是創建了一個非持久化、獨占、且自動刪除的隨機命名隊列:
var queueName = channel.QueueDeclare().QueueName;
您可以在 隊列指南 中了解更多關於exclusive
參數和其他隊列屬性的信息。
此時,queueName
包含一個隨機隊列名稱。例如,它看起來可能像amq.gen-JzTY20BRgKO-HjmUJj0wLg
。
綁定
我們已經創建好了一個fanout
交換器和一個隊列。現在我們需要告訴交換器把消息發送到我們的隊列。而交換器和隊列之間的關系就稱之為綁定。
// 把一個隊列綁定到指定交換器。
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
從現在起,logs
交換器會把消息追加到我們的隊列中。
列舉綁定
您可以使用(您或許已經猜到了),列舉出現有的綁定。sudo rabbitmqctl list_bindings
組合在一起
生產者程序負責分發消息,這與之前的教程看起來沒有太大區別。
最重要的變化是我們現在想把消息發布到我們的logs
交換器,而不是匿名交換器。在發送時我們需要提供一個路由鍵routingKey
,但是對於fanout
交換器,它的值可以被忽略。這里是EmitLog.cs
文件的代碼:
using System;
using RabbitMQ.Client;
using System.Text;
class EmitLog
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var message = GetMessage(args);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",
routingKey: "",
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent {0}", message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
private static string GetMessage(string[] args)
{
return ((args.Length > 0)
? string.Join(" ", args)
: "info: Hello World!");
}
}
(EmitLog.cs 源碼)
如你所見,在建立連接后,我們聲明了交換器。這一步非常有必要,因為發布消息到一個不存在的交換器,這種情況是被禁止的。
如果沒有隊列綁定到交換器上,消息將會丟失,但這對我們來說並沒有什么沒問題;如果沒有消費者正在監聽,我們是可以放心地把消息丟棄的。
ReceiveLogs.cs
的代碼:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class ReceiveLogs
{
public static void Main()
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
Console.WriteLine(" [x] {0}", message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
(ReceiveLogs.cs 源碼)
按照 教程[1]中的設置說明生成EmitLogs
和ReceiveLogs
項目。
如果您想把日志保存到文件中,只需打開一個控制台並輸入:
cd ReceiveLogs
dotnet run > logs_from_rabbit.log
如果你想在屏幕上看到日志,我可以新開一個終端並運行:
cd ReceiveLogs
dotnet run
當然,分發日志需要輸入:
cd EmitLog
dotnet run
使用rabbitmqctl list_bindings
命令,您可以驗證代碼是否真正創建了我們想要的綁定和隊列。當有兩個ReceiveLogs.cs
程序運行時,您應該看到如下所示的內容:
sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs exchange amq.gen-JzTY20BRgKO-HjmUJj0wLg queue []
# => logs exchange amq.gen-vso0PVvyiRIL2WoV3i48Yg queue []
# => ...done.
對執行結果的解釋簡潔明了:來自logs
交換器的數據轉發到了兩個由服務器隨機分配名稱的隊列。這正是我們期待的結果。
想要了解如何監聽消息的這一塊內容,讓我們繼續閱讀 教程[4]。
寫在最后
本文翻譯自 RabbitMQ 官方教程 C# 版本。如本文介紹內容與官方有所出入,請以官方最新內容為准。水平有限,翻譯的不好請見諒,如有翻譯錯誤還請指正。
- 原文鏈接:RabbitMQ tutorial - Publish/Subscribe
- 實驗環境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code
- 最后更新:2018-06-11