RabbitMQ (五) 訂閱者模式之分發模式 ( fanout )


前面講到了簡單隊列和工作隊列.

這兩種隊列有個非常明顯的缺點 : 生產者發送的消息,只能進入到一個隊列.

消息只能進入到一個隊列就意味着消息只能被一個消費者消費.

盡管工作隊列模式中,一個隊列中的消息可以被多個消費者消費,但是,具體到每一條消息,卻只能被一個消費者消費.

如果想要一個消息被多個消費者消費,那么生產者就必須把這條消息發送到多個隊列中去.

RabbitMQ 在這個點的設計是 :

在生產者和隊列兩者之間加入了一個叫做"交換機"的東西.

生產者發送消息時,不直接發送到隊列,而是發送到"交換機"(其實簡單隊列和工作隊列也是這樣的...前面的文章有提到,它們用的是默認的交換機).

"交換機"再根據聲明的類型(fanout,direct,topic,headers),轉發給符合要求的隊列.

這里有個非常重要的知識點:

交換機只是一個"中轉的機器",它不是一個消息隊列,它沒有存儲消息的能力.這點很重要!

這意味着,當生產者把消息發送給某個交換機時,如果這時候,這個交換機沒有被任何隊列綁定,那么這些消息將會丟失!

 

這種利用交換機,將消息"發送"到多個隊列的模式叫做 : 訂閱者模式.

這篇文章主要介紹訂閱者模式中的分發模式,

這種模式下,消息會被所有消費者消費.也就是說,只要是"綁定"到某個交換機的隊列,都會收到生產者發送到該交換機的消息.

 

生產者

    public class Producer
    {
        /// <summary>
        /// 交換機名稱
        /// </summary>
        private const string ExchangeName = "test_exchange_fanout";

        public static void Send()
        {
            IConnection connection = ConnectionHelper.GetConnection();
            IModel channel = connection.CreateModel();

            //聲明交換機,第2個參數為交換機類型
            channel.ExchangeDeclare(ExchangeName, "fanout", false, false, null);

            for (int i = 0; i < 50; i++)
            {
                string msg = "hello world " + i;
                //第2個參數為路由鍵,這種模式顯然不需要路由鍵了,因為我們是把消息發送到所有綁定到該交換機的隊列.
                channel.BasicPublish(ExchangeName, "", null, Encoding.Default.GetBytes(msg));
                Console.WriteLine($"send {msg}");
            }
            channel.Close();
            connection.Close();
        }
    }

 

消費者1

    public class Consumer1
    {
        private const string QueueName = "test_exchange1_queue";
        private const string ExchangeName = "test_exchange_fanout";

        public static void Receive()
        {
            IConnection connection = ConnectionHelper.GetConnection();
            IModel channel = connection.CreateModel();
            channel.QueueDeclare(QueueName, false, false, false, null);

            //將隊列綁定到交換機上
            channel.QueueBind(QueueName, ExchangeName, "", null);

            //添加消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //注冊事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string str = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer1 : " + str);
            };

            channel.BasicConsume(QueueName, true, "", false, false, null, consumer);
        }
    }

 

消費者2

只有這兩句不一樣

        private const string QueueName = "test_exchange2_queue";

        Console.WriteLine("consumer2 : " + str);

 

運行結果就不上圖.

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM