先決條件
本教程假定 RabbitMQ 已經安裝,並運行在localhost
標准端口(5672)。如果你使用不同的主機、端口或證書,則需要調整連接設置。從哪里獲得幫助
如果您在閱讀本教程時遇到困難,可以通過郵件列表 聯系我們。
主題
(使用 .NET 客戶端)
在 教程[4] 中,我們改進了我們日志系統。我們用direct
交換器替換了只能呆滯廣播消息的fanout
交換器,從而可以有選擇性的接收日志。
雖然使用direct
交換器改進了我們的系統,但它仍然有局限性 - 不能基於多個標准進行路由。
在我們的日志系統中,我們可能不僅要根據日志的嚴重性訂閱日志,可能還要根據日志分發源來訂閱日志。或許您可能從 unix syslog 工具中了解過這種概念,syslog 工具在路由日志的時候是可以既基於嚴重性(info/warn/crit...)又基於設備(auth/cron/kern...)的。
這種機制會給我們帶來極大的靈活性 - 我們可以僅監聽來自cron
的關鍵錯誤日志,與此同時,監聽來自kern
的所有日志。
要在我們的日志系統中實現這一特性,我們需要學習更復雜的topic
交換器。
Topic交換器
發送到topic
交換器的消息不能隨意指定routing key
,它必須是一個由點分割的單詞列表,這些單詞可以是任意內容,但通常會在其中指定一些與消息相關的特性。請看一些合法的路由鍵示例:stock.usd.nyse
,nyse.vmw
,quick.orange.rabbit
,路由鍵可以包含任意數量的單詞,但不能超過255個字節的上限。
binding key
也必須是相同的形式,topic
交換器的背后邏輯與direct
交換器類似 - 使用指定路由鍵發送的消息會被分發到與其綁定鍵匹配的所有隊列中。不過對於綁定鍵來說,有兩個重要的特殊情況需要注意:
*
(星號)可以代替一個單詞。#
(哈希)可以代替零個或多個單詞。
下圖示例是對上述內容最簡單的解釋:
在這個示例中,我們打算發送的消息全是用來描述動物的,這些消息會使用由三個單詞(兩個點)組成的路由鍵來發送。在路由鍵中,第一個單詞用來描述行動速度、第二個是顏色、第三個是物種,即:<speed>.<colour>.<species>
。
我們創建了三個綁定:Q1綁定了鍵.orange.
,Q2綁定了鍵*.*.rabbit
和lazy.#
。
這些綁定可以被概括為:
- Q1對所有橙色的動物感興趣。
- Q2對兔子以及所有行動緩慢的動物感興趣。
路由鍵為quick.orange.rabbit
的消息會被發送到這兩個隊列,消息lazy.orange.elephant
也會被發送到這兩個隊列。另外,quick.orange.fox
只會進入第一個隊列,lazy.brown.fox
只會進入第二個隊列。lazy.pink.rabbit
只會被發送到第二個隊列一次,盡管它匹配了兩個綁定(避免了消息重復)。quick.brown.fox
沒有匹配的綁定,因此它將會被丟棄。
如果我們打破約定,發送使用一個或四個單詞(例如:orange
和quick.orange.male.rabbit
)作路由鍵的消息會發生什么?答案是,這些消息因為沒有匹配到任何綁定,將被丟棄。
但是,另外,例如路由鍵為lazy.orange.male.rabbit
的消息,盡管它有四個單詞,也會匹配最后一個綁定,並將被發送到第二個隊列。
Topics 交換器
topic
交換器的功能是很強大的,它可以表現出一些其他交換器的行為。
當一個隊列與鍵#
(哈希)綁定時, 它會忽略路由鍵,接收所有消息,這就像fanout
交換器一樣。
當特殊字符*
(星號)和#
(哈希)未在綁定中使用時,topic
交換器的行為就像direct
交換器一樣。
組合在一起
我們將要在我們的日志系統中使用topic
交換器,首先假設日志的路由鍵有兩個單詞組成:<facility>.<severity>
。
代碼與上一篇 教程 中的代碼幾乎相同。
EmitLogTopic.cs
的代碼:
using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;
class EmitLogTopic
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs",
type: "topic");
var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
var message = (args.Length > 1)
? string.Join(" ", args.Skip(1).ToArray())
: "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "topic_logs",
routingKey: routingKey,
basicProperties: null,
body: body);
Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
}
}
}
ReceiveLogsTopic.cs
的代碼:
using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
class ReceiveLogsTopic
{
public static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
var queueName = channel.QueueDeclare().QueueName;
if(args.Length < 1)
{
Console.Error.WriteLine("Usage: {0} [binding_key...]",
Environment.GetCommandLineArgs()[0]);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
Environment.ExitCode = 1;
return;
}
foreach(var bindingKey in args)
{
channel.QueueBind(queue: queueName,
exchange: "topic_logs",
routingKey: bindingKey);
}
Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
var routingKey = ea.RoutingKey;
Console.WriteLine(" [x] Received '{0}':'{1}'",
routingKey,
message);
};
channel.BasicConsume(queue: queueName,
autoAck: true,
consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
請運行以下示例:
要接收所有日志:
cd ReceiveLogsTopic
dotnet run "#"
要接收來自設備kern
的所有日志:
cd ReceiveLogsTopic
dotnet run "kern.*"
或者,如果您只想監聽級別為critical
的日志:
cd ReceiveLogsTopic
dotnet run "*.critical"
您可以創建多個綁定:
cd ReceiveLogsTopic
dotnet run "kern.*" "*.critical"
使用路由鍵kern.critical
發出日志:
cd EmitLogTopic
dotnet run "kern.critical" "A critical kernel error"
希望運行這些程序能讓您玩得開心。要注意的是,這些代碼沒有針對路由鍵和綁定鍵做任何預設,您可以嘗試使用兩個以上的路由鍵參數。
( EmitLogTopic.cs 和 ReceiveLogsTopic.cs 的完整源碼)
接下來,在 教程[6] 中將了解如何將往返消息作為遠程過程調用。
寫在最后
本文翻譯自 RabbitMQ 官方教程 C# 版本。如本文介紹內容與官方有所出入,請以官方最新內容為准。水平有限,翻譯的不好請見諒,如有翻譯錯誤還請指正。
- 原文鏈接:RabbitMQ tutorial - Topics
- 實驗環境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code
- 最后更新:2018-09-06