RabbitMQ消息隊列(七)-通過fanout模式將消息推送到多個Queue中(.Net Core版)


前面第六章我們使用的是direct直連模式來進行消息投遞和分發。本章將介紹如何使用fanout模式將消息推送到多個隊列。 
有時我們會遇到這樣的情況,多個功能模塊都希望得到完整的消息數據。例如一個log的消息,一個我們希望輸出在屏幕上實時監控,另外一個用戶持久化日志。這時就可以使用fanout模式。fanout模式模式不像direct模式通過routingkey來進行匹配,而是會把消息發送到所以的已經綁定的隊列中。

新建FanoutProduct用來發布消息。FanoutCustomerA和FanoutCustomerB用來訂閱不同隊列消費消息。

FanoutProduct代碼:

using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace FanoutProduct
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchange";
            String message = "Hello World!";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection=factory.CreateConnection())
            {
                using (IModel channel=connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", durable: true, autoDelete: false, arguments: null);


                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.Persistent = true;

                    Task.Run(() =>
                    {
                        while (true)
                        {
                            for (int i = 0; i < 10000; i++)
                            {
                                Byte[] body = Encoding.UTF8.GetBytes(message + i);
                                channel.BasicPublish(exchange: exchangeName, routingKey: "", basicProperties: properties, body: body);
                            }
                            Thread.Sleep(100);
                        }
                    }).Wait();

                    Console.WriteLine(" [x] Sent {0}", message);
                }
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();


        }
    }
}
View Code

FanoutCustomerA與FanoutCustomerB(代碼相同):

using System;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace FanoutCustomerA
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchange";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection=factory.CreateConnection())
            {
                using (IModel channel=connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "fanout", durable: true, autoDelete: false, arguments: null);

                    String queueName = channel.QueueDeclare().QueueName;

                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: "", arguments: null);

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        Byte[] body = ea.Body;
                        String message = Encoding.UTF8.GetString(body);
                        Console.WriteLine(" [x] {0}", message);
                    };

                    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);

                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();

                }
            }
        }
    }
}
View Code

 

 

 

可以看到FanoutCustomerA和FanoutCustomerB收到的消息完全一致。注意以上代碼FanoutProduct中並沒有新建隊列,所以先運行FanoutCustomerA和FanoutCustomerB,如果先運行FanoutProduct因為找不到綁定的隊列數據就會丟失。 
還有一種情況我們有可能隨時增加一項處理機制,如果在聲明queue時不指定名字,那么RabbitMQ會隨機為我們生成一個名字,如果不指定queue為持久化隊列那在消息為空並且訂閱者為0時自動刪除該隊列。這樣Queue揮之即來呼之即去。

 

String queueName = channel.QueueDeclare().QueueName;

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM