在前一篇中,我們構建了一個簡單的日志系統,我們已經能夠廣播消息到許多的接收者。在這一篇中,我們希望增加一個特性,讓訂閱消息的子集成為可能。例如,我們可以將重要的錯誤日志存放到日志文件(即,磁盤上面),同時將仍然所有的日志信息打印到控制台。
綁定
在前面的例子中我們已經創建過綁定,你應該還能記得下面的代碼:
1 channel.QueueBind(queue: queueName, 2 exchange: "logs", 3 routingKey: "");
一個綁定是一個交換器和一個隊列的關系,可以形象的解讀為:這個隊列對來自這個交換器的消息感興趣。綁定可以擁有一個額外的參數:routingKey,為了避免和BasicPublish里面的routingKey混淆,我們稱其為綁定關鍵字(binding Key),下面展示了如何通過一個綁定關鍵字創建一個綁定:
1 channel.QueueBind(queue: queueName, 2 exchange: "direct_logs", 3 routingKey: "black");
綁定關鍵字的意義依賴於交換器的類型,前面我們使用過的fanout類型的交換器,直接簡單的忽略了它的綁定關鍵字的值。
Direct交換器
前一篇教程中的日志系統廣播所有的消息到所有的消費者,我們希望擴展它能根據嚴重程度對消息進行過濾。例如,我們希望寫日志到磁盤的程序能夠只接收重要的錯誤,不要在“警告”、“信息”級別的日志身上浪費磁盤空間。
我們使用fanout類型的交換器,並不能帶來很大的靈活性,它只能盲目的廣播。作為替代,我們將使用direct類型的交換器,這種交換器背后的路由算法非常簡單,消息發送到綁定關鍵字(binding Key)和消息的路由關鍵字(routing Key)完全匹配的隊列。
為了幫助說明,設想下面的配置:
在這個設置中,我們看到direct類型的交換器X綁定了兩個隊列.第一個隊列綁定關鍵字是orange;第二個隊列有兩個綁定,一個是black,另一個是green。在這樣的配置中,發送到使用orange為路由關鍵字的消息將被路由到Q1,使用black或green為路由關鍵字的消息將被路由到隊列Q2,其他消息將被銷毀。
多綁定
使用同一個綁定關鍵字綁定多個隊列是完全合法的,在我們的示例中我們可以在X和Q1中添加一個以black為綁定關鍵字的綁定。在這種情況下,direct類型的交換器會展現出fanout類型交換器的行為,並且會廣播消息到所有滿足條件的隊列,發送到路由關鍵字為black的消息會發送到Q1和Q2.
發送日志
我們會為日志系統使用這種模式,將消息發送到direct類型而非fanout類型的交換器。我們將使用日志的嚴重等級來作為路由關鍵字,這樣的話消息的接收程序就可以依據自己的需要選擇不同的嚴重等級。首先讓我們把注意力集中到發送日志上。
和往常一樣,我們首先需要創建一個交換器:
1 channel.ExchangeDeclare(exchange: "direct_logs", type: "direct");
接着我們准備好發送消息了:
1 var body = Encoding.UTF8.GetBytes(message); 2 channel.BasicPublish(exchange: "direct_logs", 3 routingKey: severity, 4 basicProperties: null, 5 body: body);
為了簡化問題,我們假設嚴重等級是‘info’、‘warning’、‘error’。
訂閱
馬上就可以像前面的教程中那樣接收消息了,但是有一個不同點,我們要用自己感興趣的所有日志嚴重程度分別創建一個綁定。
1 var queueName = channel.QueueDeclare().QueueName; 2 3 foreach(var severity in args) 4 { 5 channel.QueueBind(queue: queueName, 6 exchange: "direct_logs", 7 routingKey: severity); 8 }
組合在一起
EmitLogDirect.cs類的代碼:
1 using System; 2 using System.Linq; 3 using RabbitMQ.Client; 4 using System.Text; 5 6 class EmitLogDirect 7 { 8 public static void Main(string[] args) 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using(var connection = factory.CreateConnection()) 12 using(var channel = connection.CreateModel()) 13 { 14 channel.ExchangeDeclare(exchange: "direct_logs", 15 type: "direct"); 16 17 var severity = (args.Length > 0) ? args[0] : "info"; 18 var message = (args.Length > 1) 19 ? string.Join(" ", args.Skip( 1 ).ToArray()) 20 : "Hello World!"; 21 var body = Encoding.UTF8.GetBytes(message); 22 channel.BasicPublish(exchange: "direct_logs", 23 routingKey: severity, 24 basicProperties: null, 25 body: body); 26 Console.WriteLine(" [x] Sent '{0}':'{1}'", severity, message); 27 } 28 29 Console.WriteLine(" Press [enter] to exit."); 30 Console.ReadLine(); 31 } 32 }
ReceiveLogsDirect.cs的代碼:
1 using System; 2 using RabbitMQ.Client; 3 using RabbitMQ.Client.Events; 4 using System.Text; 5 6 class ReceiveLogsDirect 7 { 8 public static void Main(string[] args) 9 { 10 var factory = new ConnectionFactory() { HostName = "localhost" }; 11 using(var connection = factory.CreateConnection()) 12 using(var channel = connection.CreateModel()) 13 { 14 channel.ExchangeDeclare(exchange: "direct_logs", 15 type: "direct"); 16 var queueName = channel.QueueDeclare().QueueName; 17 18 if(args.Length < 1) 19 { 20 Console.Error.WriteLine("Usage: {0} [info] [warning] [error]", 21 Environment.GetCommandLineArgs()[0]); 22 Console.WriteLine(" Press [enter] to exit."); 23 Console.ReadLine(); 24 Environment.ExitCode = 1; 25 return; 26 } 27 28 foreach(var severity in args) 29 { 30 channel.QueueBind(queue: queueName, 31 exchange: "direct_logs", 32 routingKey: severity); 33 } 34 35 Console.WriteLine(" [*] Waiting for messages."); 36 37 var consumer = new EventingBasicConsumer(channel); 38 consumer.Received += (model, ea) => 39 { 40 var body = ea.Body; 41 var message = Encoding.UTF8.GetString(body); 42 var routingKey = ea.RoutingKey; 43 Console.WriteLine(" [x] Received '{0}':'{1}'", 44 routingKey, message); 45 }; 46 channel.BasicConsume(queue: queueName, 47 noAck: true, 48 consumer: consumer); 49 50 Console.WriteLine(" Press [enter] to exit."); 51 Console.ReadLine(); 52 } 53 } 54 }
像通常那樣編譯(編譯建議見教程一)。
如果你只想保存嚴重等級為‘warning’、‘error’的日志到文件中,只需要打開控制台然后輸入:
1 $ ReceiveLogsDirect.exe warning error > logs_from_rabbit.log
如果你想在屏幕上看到所有的日志,打開另一個終端輸入:
1 $ ReceiveLogsDirect.exe info warning error 2 [*] Waiting for logs. To exit press CTRL+C
如果想發送‘error’級別的日志,舉例來說,你只需要輸入如下信息:
1 $ EmitLogDirect.exe error "Run. Run. Or it will explode." 2 [x] Sent 'error':'Run. Run. Or it will explode.'
進入教程五,了解如何基於模式(pattern)監聽消息。
原文鏈接:http://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html