[譯]RabbitMQ教程C#版 - 路由


先決條件
本教程假定 RabbitMQ 已經安裝,並運行在localhost 標准端口(5672)。如果你使用不同的主機、端口或證書,則需要調整連接設置。

從哪里獲得幫助
如果您在閱讀本教程時遇到困難,可以通過郵件列表 聯系我們

路由

(使用.NET客戶端)

教程[3] 中,我們構建了一個簡單的日志系統,可以向多個接收者廣播消息。

在本教程中,我們會為日志系統再添加一個特性,使其可以只訂閱消息的一個子集。例如,將所有日志消息打印到
控制台,同時只會將嚴重錯誤消息寫入日志文件(保存到磁盤空間)。

綁定

在前面的例子中,我們創建過綁定。不知道您是否還記得下面的代碼:

channel.QueueBind(queue: queueName,
                  exchange: "logs",
                  routingKey: "");

綁定是指交換器和隊列之間的關聯關系。可以簡單地理解為:某個隊列對來自此交換器的消息感興趣。

綁定可以采用額外的routingKey參數,為了避免與BasicPublish方法中相同參數混淆,我們將其稱為binding key(這里是指路由鍵從聲明角度的一種別稱,綁定鍵)。下面即是如何使用綁定鍵 建立一個綁定:

channel.QueueBind(queue: queueName,
                  exchange: "direct_logs",
                  routingKey: "black");

綁定鍵的含義取決於交換器類型。像我們前面使用的fanout 交換器,忽略了它的值(依據fanout交換器的特性,它會把消息廣播到所有訂閱的隊列,所以就算指定routingKey也不會根據其過濾消息)。

Direct交換器

在上篇教程中,我們的日志系統會把所有消息廣播給所有消費者,現在我們想要擴展使其可以根據消息的嚴重性過濾消息。例如,我們希望將日志消息寫入磁盤的腳本僅接收嚴重錯誤的消息,而不是在警告或者信息類型的消息上浪費磁盤空間。

之前我們使用的是fanout交換器,它沒有給我們足夠的靈活性 - 它只能進行無意識的廣播。

現在我們要用direct交換器替換它,direct交換器背后的路由算法很簡單 - 消息會進入其binding key恰好與routing key相匹配的隊列。
為了說明這一點,請參考以下設置:

在上面的設置中,我們可以看到direct交換器X與兩個隊列綁定。第一個隊列通過鍵orange綁定,第二個隊列有兩個綁定,一個通過鍵black綁定、另外一個通過鍵green綁定。

如此設置,發布使用路由鍵orange的消息到交換器最終會被路由到隊列Q1,路由鍵為blackgreen的消息會去向隊列Q2,而其他所有的消息會被丟棄。

多重綁定

使用相同的綁定鍵綁定多個隊列是完全合法的。在示例中,我們可以在XQ1之間添加一個鍵為black的綁定。這種情況下,direct交換器會像fanout交換器一樣,把消息廣播到所有匹配的隊列,路由鍵為black的消息會被分別發送到隊列Q1Q2

發送日志

我們將在日志系統中使用上述消息模型,在發送消息時使用direct交換機來替換fanout交換器。同時我們會把日志的嚴重性作為路由鍵,這樣的話,接收腳本就可以選擇性地接收它期望嚴重性的消息。首先我們來關注如何發送日志。

同樣地,我們需要先創建一個交換器:

channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);

准備好發送消息:

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",
                     routingKey: severity,
                     basicProperties: null,
                     body: body);

簡單起見,我們先假定severity可以是infowarningerror任意一值。

訂閱

馬上就可以像前面的教程接收消息了,但有一點不同, 我們需要為我們感興趣的每種日志嚴重性級別的消息建立一個新的綁定。

var queueName = channel.QueueDeclare().QueueName;

foreach(var severity in args)
{
    channel.QueueBind(queue: queueName,
                      exchange: "direct_logs",
                      routingKey: severity);
}

組合在一起

EmitLogDirect.cs類的代碼:

using System;
using System.Linq;
using RabbitMQ.Client;
using System.Text;

class EmitLogDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");

            var severity = (args.Length > 0) ? args[0] : "info";
            var message = (args.Length > 1)
                          ? string.Join(" ", args.Skip(1).ToArray())
                          : "Hello World!";
            var body = Encoding.UTF8.GetBytes(message);
            channel.BasicPublish(exchange: "direct_logs",
                                 routingKey: severity,
                                 basicProperties: null,
                                 body: body);
            Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message);
        }

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

ReceiveLogsDirect.cs類的代碼:

using System;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;

class ReceiveLogsDirect
{
    public static void Main(string[] args)
    {
        var factory = new ConnectionFactory() { HostName = "localhost" };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.ExchangeDeclare(exchange: "direct_logs",
                                    type: "direct");
            var queueName = channel.QueueDeclare().QueueName;

            if(args.Length < 1)
            {
                Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",
                                        Environment.GetCommandLineArgs()[0]);
                Console.WriteLine(" Press [enter] to exit.");
                Console.ReadLine();
                Environment.ExitCode = 1;
                return;
            }

            foreach(var severity in args)
            {
                channel.QueueBind(queue: queueName,
                                  exchange: "direct_logs",
                                  routingKey: severity);
            }

            Console.WriteLine(" [*] Waiting for messages.");

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'",
                                  routingKey, message);
            };
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

請像往常一樣創建項目(請參閱 教程[1])。

如果您想將warningerror(不包括info)日志消息保存到文件,只需打開控制台並輸入:

cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log

如果您想在屏幕上看到所有日志消息,請打開一個新終端並執行以下操作:

cd ReceiveLogsDirect
dotnet run info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,想要發出error日志消息,只需要輸入:

cd EmitLogDirect
dotnet run error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

EmitLogDirect.csReceiveLogsDirect.cs 的完整源代碼。

跳轉到 教程[5],了解如何基於模式監聽消息。

寫在最后

本文翻譯自 RabbitMQ 官方教程 C# 版本。如本文介紹內容與官方有所出入,請以官方最新內容為准。水平有限,翻譯的不好請見諒,如有翻譯錯誤還請指正。

  • 原文鏈接:RabbitMQ tutorial - Routing
  • 實驗環境:RabbitMQ 3.7.4 、.NET Core 2.1.3、Visual Studio Code
  • 最后更新:2018-08-31


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM