RabbitMQ系列教程之五:主題(Topic)


(本實例都是使用的Net的客戶端,使用C#編寫),說明,中文方括號【】表示名詞。

   在上一個教程中,我們改進了我們的日志記錄系統。 沒有使用只能夠進行虛擬廣播的【Fanout】交換機,而是使用了【Direct】類型的交換機,這樣做就可以讓我們有可能選擇性地接收日志。

  雖然使用【Direct】類型的【消息交換機】改進了我們的系統,但它仍然有限制 - 它不能基於多個標准進行路由選擇。


  在我們的日志記錄系統中,我們可能不僅要根據嚴重性訂閱日志,還可以基於發出日志的源進行訂閱。 您可能會從syslog unix工具中了解這一概念,該工具根據嚴重性(info/warn/crit...)和設施(auth / cron / kern ...)路由日志。

   這將給我們很大的靈活性 - 我們可能既想監聽來自“cron”的重要錯誤,也可以監聽“kern”的所有日志。

   要在我們的日志記錄系統中實現,我們需要了解一個更為復雜的topic類型的【消息交換機】。

1、Topic類型的【消息交換機】

   發送到【Topic】類型【消息交換機】的消息不能有任意的routing_key - 它必須是由點分隔的單詞列表。 這些詞可以是任何東西,但通常它們指定與消息相關聯的一些功能。 幾個有效的路由關鍵字示例:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。 路由關鍵字中可以有任意多的單詞,最多可達255個字節。

   綁定鍵也必須是相同的形式。【Topic】類型的【消息交換機】背后的邏輯類似於【Direct】類型的【消息交換機】 - 使用特定【路由鍵】發送的消息將被傳遞到與匹配的【綁定鍵】綁定的所有隊列。 但是,【綁定鍵】有兩個重要的特殊情況:

     *(星)可以替代一個字。
     #(井號)可以替換零個或多個單詞。

    在一個例子中最簡單的解釋一下:
   

    在這個例子中,我們將發送所有描述動物的消息。消息將使用由三個字(兩個點)組成的【路由鍵】發送。【路由鍵】中的第一個字將描述速度,第二個顏色和第三個種類:“<speed>.<color>.<species>”。

    我們創建了三個綁定:Q1綁定鍵“*.orange.*”和Q2是“*.*.rabbit”和“lazy.#”綁定。

    這些【綁定鍵】所要表達意思可以總結為:

    Q1對所有的橙色動物感興趣。

    Q2想聽聽有關兔子的一切,以及關於lazy動物的一切。

    將【路由鍵】設置為“quick.orange.rabbit”的消息將傳遞給兩個隊列。消息“lazy.orange.elephant”也會發送他們那兩個隊列。另一方面,“quick.orange.fox”只會發送到第一個隊列,而“lazy.brown.fox”只能發送到第二個隊列。 “lazy.pink.rabbit”將被傳遞到第二個隊列只有一次,即使它匹配兩個綁定。 “quick.brown.fox”不匹配任何綁定,所以它將被丟棄。

   如果我們違反約定並發送一個或四個字的消息,如“orange”或“quick.orange.male.rabbit”,會發生什么?那么這些消息將不會匹配任何綁定,並將丟失。

   另一方面,“lazy.orange.male.rabbit”即使它有四個字,將匹配上一個綁定,並將被傳遞到第二個隊列。

   說明:【Topic】類型的【消息交換機】

     此類型的【消息交換機】是強大的,可以像其他【消息交換機】一樣行事。

     當隊列用“#”(哈希)【綁定鍵】綁定時,它將接收所有消息,而不管【路由鍵】,就像使用【Fanout】類型的【消息交換機】。

     當特殊字符“*”(星號)和“#”(哈希)不用於綁定時,【Topic】類型的【消息交換機】將表現得像一個使用【Direct】類型的【消息交換機】。


2、代碼整合

  我們將在我們的日志記錄系統中使用【Topic】類型【消息交換機】。 我們將從一個工作假設開始,日志的【路由鍵】將有兩個單詞組成:“<facility>.<severity>”。

代碼與上一個教程幾乎相同。

EmitLogTopic.cs的代碼:

 1 using System;
 2 using System.Linq;
 3 using RabbitMQ.Client;
 4 using System.Text;
 5 
 6 class EmitLogTopic
 7 {
 8     public static void Main(string[] args)
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.ExchangeDeclare(exchange: "topic_logs",
15                                     type: "topic");
16 
17             var routingKey = (args.Length > 0) ? args[0] : "anonymous.info";
18             var message = (args.Length > 1)
19                           ? string.Join(" ", args.Skip( 1 ).ToArray())
20                           : "Hello World!";
21             var body = Encoding.UTF8.GetBytes(message);
22             channel.BasicPublish(exchange: "topic_logs",
23                                  routingKey: routingKey,
24                                  basicProperties: null,
25                                  body: body);
26             Console.WriteLine(" [x] Sent '{0}':'{1}'", routingKey, message);
27         }
28     }
29 }


ReceiveLogsTopic.cs的代碼:

 1 using System;
 2 using RabbitMQ.Client;
 3 using RabbitMQ.Client.Events;
 4 using System.Text;
 5 
 6 class ReceiveLogsTopic
 7 {
 8     public static void Main(string[] args)
 9     {
10         var factory = new ConnectionFactory() { HostName = "localhost" };
11         using(var connection = factory.CreateConnection())
12         using(var channel = connection.CreateModel())
13         {
14             channel.ExchangeDeclare(exchange: "topic_logs", type: "topic");
15             var queueName = channel.QueueDeclare().QueueName;
16 
17             if(args.Length < 1)
18             {
19                 Console.Error.WriteLine("Usage: {0} [binding_key...]",
20                                         Environment.GetCommandLineArgs()[0]);
21                 Console.WriteLine(" Press [enter] to exit.");
22                 Console.ReadLine();
23                 Environment.ExitCode = 1;
24                 return;
25             }
26 
27             foreach(var bindingKey in args)
28             {
29                 channel.QueueBind(queue: queueName,
30                                   exchange: "topic_logs",
31                                   routingKey: bindingKey);
32             }
33 
34             Console.WriteLine(" [*] Waiting for messages. To exit press CTRL+C");
35 
36             var consumer = new EventingBasicConsumer(channel);
37             consumer.Received += (model, ea) =>
38             {
39                 var body = ea.Body;
40                 var message = Encoding.UTF8.GetString(body);
41                 var routingKey = ea.RoutingKey;
42                 Console.WriteLine(" [x] Received '{0}':'{1}'",
43                                   routingKey,
44                                   message);
45             };
46             channel.BasicConsume(queue: queueName,
47                                  noAck: true,
48                                  consumer: consumer);
49 
50             Console.WriteLine(" Press [enter] to exit.");
51             Console.ReadLine();
52         }
53     }
54 }


3、運行以下示例:

收到所有的日志:

cd ReceiveLogsTopic
dotnet run“#”


從設備“kern”接收所有日志:

cd ReceiveLogsTopic
dotnet run “kern.*”

或者如果您只想聽到關於“critical”日志的信息:

ReceiveLogsTopic.exe“* .critical”


您可以創建多個綁定:

cd ReceiveLogsTopic
dotnet run“kern.*”“*.critical”


並使用【路由鍵】“kern.critical”類型發出日志:

cd emitLogTopic
dotnet run“kern.critical”“A critial kernel error”

寫這些程序很有趣。 請注意,代碼不會對【路由鍵】或【綁定鍵】做任何假設,您可能希望使用兩個以上的【路由鍵】參數進行操作。

今天就到此為止了,如果英文比較好的,可以查看原文,原文地址是:http://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM