原文來自 RabbitMQ 英文官網的教程(4.Routing),其示例代碼采用了 .NET C# 語言。
In the previous tutorial we built a simple logging system. We were able to broadcast log messages to many receivers.
在之前的教程中,我們構建了一個簡單的日志系統,我們可以廣播日志消息給眾多的接收人。
In this tutorial we're going to add a feature to it - we're going to make it possible to subscribe only to a subset of the messages. For example, we will be able to direct only critical error messages to the log file (to save disk space), while still being able to print all of the log messages on the console.
在本教程中我們即將為日志系統添加一個特性 - 使其可以只訂閱消息的一個子集。比如,我們只會直接將嚴重錯誤消息寫入到日志文件(即保存到磁盤空間),與此同時還會把所有的日志消息打印到控制台。
Bindings
綁定
In previous examples we were already creating bindings. You may recall code like:
在之前的示例中我們已經創建過綁定,你可能會回憶起類似的代碼:
channel.QueueBind(queue: queueName,
exchange: "logs",
routingKey: "");
A binding is a relationship between an exchange and a queue. This can be simply read as: the queue is interested in messages from this exchange.
綁定就是交換機和隊列之間的一種關系,可以簡單地理解為:某隊列對來自該交換機的消息感興趣。
Bindings can take an extra routingKey parameter. To avoid the confusion with a BasicPublish parameter we're going to call it a binding key. This is how we could create a binding with a key:
綁定可以使用一個額外的 routingKey 參數,為避免與 BasicPublish 參數相混淆我們將其稱呼為“綁定鍵”,如下便是如何采用該鍵來創建一個綁定:
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: "black");
The meaning of a binding key depends on the exchange type. The fanout exchanges, which we used previously, simply ignored its value.
“綁定鍵”的含義取決於交換機類型,像我們之前使用過的 fanout 型交換機,簡單起鍵其值在這里就忽略了。(fanout 型交換機是無差別廣播到所有隊列,即使為 routingKey 命名也沒有意義)
Direct exchange
直接型交換機
Our logging system from the previous tutorial broadcasts all messages to all consumers. We want to extend that to allow filtering messages based on their severity.
For example we may want the script which is writing log messages to the disk to only receive critical errors, and not waste disk space on warning or info log messages.
在我們之前的教程中,日志系統會廣播所有的消息給所有的消費者。我們希望基於系統所面臨的壓力來做一些諸如允許過濾消息這樣的擴充。
比如我們可能希望只接收嚴重錯誤的日志消息腳本,然后將其寫入磁盤,而不要在警告型或提示型日志消息方面浪費磁盤空間。
We were using a fanout exchange, which doesn't give us much flexibility - it's only capable of mindless broadcasting.
我們一直在使用 fanout 型交換機,但它給予不了我們足夠的靈活性 - 它僅僅適用於無差別(無腦式)的廣播。
We will use a direct exchange instead. The routing algorithm behind a direct exchange is simple - a message goes to the queues whose binding key exactly matches the routing key of the message.
好在我們可以使用 direct 型交換機,其背后的路由算法很簡單 - 即消息會流向其綁定鍵恰好與路由鍵相匹配的隊列。
To illustrate that, consider the following setup:
為說明這個,可以考慮接下來的設置:
In this setup, we can see the direct exchange X with two queues bound to it. The first queue is bound with binding key orange, and the second has two bindings, one with binding key black and the other one with green.
在這個設置中,我們可以看到符號為 X 的 direct 型交換機有兩個隊列關聯 到它。第一個隊列通過“綁定鍵” orange 來關聯,第二個隊列則有兩個綁定,一個基於 black “綁定鍵”,另一個基於 green “綁定鍵”。
In such a setup a message published to the exchange with a routing key orange will be routed to queue Q1. Messages with a routing key of black or green will go to Q2. All other messages will be discarded.
在這樣一番設置中,發布到基於 orange 路由鍵綁定的消息將會被路由到 Q1 隊列,而基於 black 或者 green 路由鍵的消息則去往 Q2 隊列,而其他所有的消息將會被丟棄。
Multiple bindings
多重綁定
It is perfectly legal to bind multiple queues with the same binding key. In our example we could add a binding between X and Q1 with binding key black. In that case, the direct exchange will behave like fanout and will broadcast the message to all the matching queues. A message with routing key black will be delivered to both Q1 and Q2.
采用相同的“綁定鍵”來綁定多個隊列是完全合法的。在我們的示例中,可以基於 black “綁定鍵”來添加一個 X 和 Q1 之間的綁定。如此,direct 型交換機將表現得與 fanout 型相像,並且會把消息廣播給所有匹配的隊列。如此,基於 black “路由鍵”的消息將會被遞送到 Q1 和 Q2 兩個隊列。
Emitting logs
發出日志
We'll use this model for our logging system. Instead of fanout we'll send messages to a direct exchange. We will supply the log severity as a routing key. That way the receiving script will be able to select the severity it wants to receive. Let's focus on emitting logs first.
我們將為日志系統使用以上模型,我們會在發送消息時使用 direct 型交換機,而不是 fanout 型。我們會基於日志的嚴重性作為路由鍵,這樣的話接收端腳本將可以選擇它期望接收的嚴重性。讓我們首先聚焦在發送日志方面。
As always, we need to create an exchange first:
一如既往,我們需要首先創建一個交換機:
channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
And we're ready to send a message:
接着我們准備好發送消息:
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
To simplify things we will assume that 'severity' can be one of 'info', 'warning', 'error'.
為簡單起鍵,我們假定“severity”變量指的就是“info”、“warning”以及“error”中的一種。
Subscribing
訂閱
Receiving messages will work just like in the previous tutorial, with one exception - we're going to create a new binding for each severity we're interested in.
就像之前教程一樣,接收消息這一塊都運行得很好,唯獨有一處不同 -- 那就是我們 為自己所感興趣的(日志的)每一種嚴重性創建一個新的綁定。
var queueName = channel.QueueDeclare().QueueName;
foreach(var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
Putting it all together
融合一起
The code for EmitLogDirect.cs class:
EmitLogDirect.cs 類文件代碼:
using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;
class EmitLogDirect
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "direct_logs",
type: "direct");
var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1)
? string.Join(" ", args.Skip( 1 ).ToArray())
: "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
routingKey: severity,
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
The code for ReceiveLogsDirect.cs:
ReceiveLogsDirect.cs 類文件代碼:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class ReceiveLogsDirect
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "direct_logs",
type: "direct");
var queueName = channel.QueueDeclare().QueueName;
if(args.Length < 1)
{
Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach(var severity in args)
{
channel.QueueBind(queue: queueName,
exchange: "direct_logs",
routingKey: severity);
}
Console.WriteLine(" [*] Waiting for messages.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'",
routingKey, message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
Create projects as usual (see tutorial one for advice).
像往常一樣創建工程(看看教程第一章的建議)
If you want to save only 'warning' and 'error' (and not 'info') log messages to a file, just open a console and type:
如果你只想保存“warning”和“error”(不包括“info”)類型的日志消息到文件,只需打開控制台並輸入:
cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log
If you'd like to see all the log messages on your screen, open a new terminal and do:
如果你想在顯示屏上看到所有的日志消息,可以打開新的終端並輸入:
cd ReceiveLogsDirect
dotnet run info warning error
# => [*] Waiting for logs. To exit press CTRL+C
And, for example, to emit an error log message just type:
比如,為了產生一個 error 級別的日志消息只需輸入:
cd EmitLogDirect
dotnet run error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'
(Full source code for (EmitLogDirect.cs source) and (ReceiveLogsDirect.cs source))