前面第六章我們使用的是direct直連模式來進行消息投遞和分發。本章將介紹如何使用fanout模式將消息推送到多個隊列。
有時我們會遇到這樣的情況,多個功能模塊都希望得到完整的消息數據。例如一個log的消息,一個我們希望輸出在屏幕上實時監控,另外一個用戶持久化日志。這時就可以使用fanout模式。fanout模式模式不像direct模式通過routingkey來進行匹配,而是會把消息發送到所以的已經綁定的隊列中。
新建FanoutProduct用來發布消息。FanoutCustomerA和FanoutCustomerB用來訂閱不同隊列消費消息。
FanoutProduct代碼:

using System; using System.Text; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace FanoutProduct { class Program { static void Main(string[] args) { String exchangeName = "wytExchange"; String message = "Hello World!"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", durable: true, autoDelete: false, arguments: null); IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; Task.Run(() => { while (true) { for (int i = 0; i < 10000; i++) { Byte[] body = Encoding.UTF8.GetBytes(message + i); channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: properties, body: body); } Thread.Sleep(100); } }).Wait(); Console.WriteLine(" [x] Sent {0}", message); } } Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } }
FanoutCustomerA與FanoutCustomerB(代碼相同):

using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace FanoutCustomerA { class Program { static void Main(string[] args) { String exchangeName = "wytExchange"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", durable: true, autoDelete: false, arguments: null); String queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "", arguments: null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { Byte[] body = ea.Body; String message = Encoding.UTF8.GetString(body); Console.WriteLine(" [x] {0}", message); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } } } }
可以看到FanoutCustomerA和FanoutCustomerB收到的消息完全一致。注意以上代碼FanoutProduct中並沒有新建隊列,所以先運行FanoutCustomerA和FanoutCustomerB,如果先運行FanoutProduct因為找不到綁定的隊列數據就會丟失。
還有一種情況我們有可能隨時增加一項處理機制,如果在聲明queue時不指定名字,那么RabbitMQ會隨機為我們生成一個名字,如果不指定queue為持久化隊列那在消息為空並且訂閱者為0時自動刪除該隊列。這樣Queue揮之即來呼之即去。
String queueName = channel.QueueDeclare().QueueName;