前兩章我們講了RabbitMQ的direct模式和fanout模式,本章介紹topic主題模式的應用。如果對direct模式下通過routingkey來匹配消息的模式已經有一定了解那fanout也很好理解。簡單的可以理解成direct是通過routingkey精准匹配的,而topic是通過routingkey來模糊匹配。
在topic模式下支持兩個特殊字符的匹配。
* (星號) 代表任意 一個單詞
# (井號) 0個或者多個單詞
注意:上面說的是單詞不是字符。
如下圖所示,RabbitMQ direct模式通過RoutingKey來精准匹配,RoutingKey為red的投遞到Queue1,RoutingKey為black和white的投遞到Queue2。
我們可以假設一個場景,我們要做一個日志模塊來收集處理不同的日志,日志區分包含三個維度的標准:模塊、日志緊急程度、日志重要程度。模塊分為:red、black、white;緊急程度分為:critical、normal;把重要程度分為:medium、low、high在RoutingKey字段中我們把這三個維度通過兩個“.“連接起來。
現在我們需要對black模塊,緊急程度為critical,重要程度為high的日志分配到隊列1打印到屏幕;對所以模塊重要程度為high的日志和white緊急程度為critical的日志發送到隊列2持久化到硬盤。如下示例:
-
RoutingKey為“black.critical.high”的日志會投遞到queue1和queue2,。
-
RoutingKey為“red.critical.high”的日志會只投遞到queue2。
-
RoutingKey為“white.critical.high”的日志會投遞到queue2,並且雖然queue2的兩個匹配規則都符合但只會向queue2投遞一份。
新建TopicProduct用來發布三種routingkey的消息。

using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace TopicProduct { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeTopic"; String routeKeyName1 = "black.critical.high"; String routeKeyName2 = "red.critical.high"; String routeKeyName3 = "white.critical.high"; String message1 = "black-critical-high!"; String message2 = "red-critical-high!"; String message3 = "white-critical-high!"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null); IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; Byte[] body1 = Encoding.UTF8.GetBytes(message1); Byte[] body2 = Encoding.UTF8.GetBytes(message2); Byte[] body3 = Encoding.UTF8.GetBytes(message3); //消息推送 channel.BasicPublish(exchange: exchangeName, routingKey:routeKeyName1,basicProperties: properties, body: body1); channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName2, basicProperties: properties, body: body2); channel.BasicPublish(exchange: exchangeName, routingKey: routeKeyName3, basicProperties: properties, body: body3); Console.WriteLine(" [x] Sent {0}", message1); Console.WriteLine(" [x] Sent {0}", message2); Console.WriteLine(" [x] Sent {0}", message3); } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
新建TopicCustomerA接收一種消息

using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace TopicCustomerA { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeTopic"; String routeKeyName1 = "black.critical.high"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null); String queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
新建TopicCustomerB接收兩種消息

using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace TopicCustomerB { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeTopic"; String routeKeyName1 = "red.critical.*"; String routeKeyName2 = "white.critical.*"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "topic", durable: true, autoDelete: false, arguments: null); String queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName1, arguments: null); channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routeKeyName2, arguments: null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
先運行TopicCustomerA和TopicCustomerB保持訂閱狀態。然后執行TopicProduct發布消息。TopicCustomerA和TopicCustomerB收到的消息如下:
如上截圖,驗證了我們之前的結論。
另外還有一些特殊情況例如:
如果binding_key 是 “#” - 它會接收所有的Message,不管routing_key是什么,就像是fanout
exchange。
如果 “*” and “#” 沒有被使用,那么topic exchange就變成了direct exchange。