RabbitMQ的發布訂閱模式(Publish/Subscribe)(三)


目錄

本系列向大家介紹RabbitMQ的簡單用法;

1. RabbitMQ的簡單實踐

2. RabbitMQ的輪詢模式和公平分發

3. RabbitMQ的發布訂閱模式(Publish/Subscribe)

4. RabbitMQ路由模式(Routing)

5. RabbitMQ的主題(Topic)模式

一、發布/訂閱(Publish/Subscribe)模式

發布訂閱是我們經常會用到的一種模式,生產者生產消息后,所有訂閱者都可以收到。RabbitMQ的發布/訂閱模型圖如下:

1、該模式下生產者並不是直接操作隊列,而是將數據發送給交換機,由交換機將數據發送給與之綁定的隊列;
2、該模式必須聲明交換機,並且設置模式: channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);  
fanout指分發模式(將每一條消息都發送到與交換機綁定的隊)。
3、隊列必須綁定交換機:channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

二、發布消息

消息生產者向交換機(exchange)發送消息,代碼如下:

   // 定義交換機名稱
        static string EXCHANGE_NAME = "ps_exchange_fanout";
        public static void PublishMessage()
        {
            try
            {
                var conn = RabbitMQHelper.GetConnection();
                var channel = conn.CreateModel();
                // 定義exchange
                channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
                string msg = "hello ps!";
                var body = Encoding.UTF8.GetBytes(msg);
                channel.BasicPublish(EXCHANGE_NAME, "", null, body);
                Console.WriteLine("send msg:" + msg);
                channel.Close();
                conn.Close();
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

消息發送成功,截圖如下:

三、訂閱消息

在這里需要兩個消費者,消息發送后,所有的訂閱者都可以收到消息;

3.1 消費者1

和輪詢分發以及公平分發不同的是,消費者需要將隊列綁定到交換機,來訂閱消息;實現代碼如下:

        static string EXCHANGE_NAME = "ps_exchange_fanout";
        static string QUEUE_NAME = "ps_queue_sub1";
        /// <summary>
        /// 訂閱消費者1
        /// </summary>
        static void SubscribeConsumer1()
        {
            var conn = RabbitMQHelper.GetConnection();
            var channel = conn.CreateModel();
            // 定義exchange
            channel.ExchangeDeclare(EXCHANGE_NAME, type: ExchangeType.Fanout);
            // 綁定queue
            channel.QueueDeclare(queue: QUEUE_NAME);
            channel.QueueBind(QUEUE_NAME, EXCHANGE_NAME, "");
            // 定義Consumer
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model,ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                Console.WriteLine($"SubscribeConsumer1 收到消息: {message},時間:{DateTime.Now}");
            };
            //啟動消費者 設置為手動應答消息
            channel.BasicConsume(queue: QUEUE_NAME, autoAck: true, consumer: consumer);
            Console.WriteLine("Subscribe Consumer1 消費者已啟動");
            Console.ReadKey();
            channel.Dispose();
            conn.Close();
        }

3.2 消費者2

消費者2的代碼和1的基本相同,大家可以將1的修改一下,就可以使用,在此就不重復貼出了;

3.3 接收消息結果

消費者1接收消息截圖:

消費者2接收消息截圖:

通過上圖,我們可以看到,發布者發布消息后,訂閱者1、2均受到了相同的消息,至此功能已經完成;

四、小結

4.1 訂閱者代碼的主要流程

根據消費者的代碼,我們可以提煉流程如下
(1)創建連接
(2)聲明exchange
(3)綁定隊列到exchange
(4)聲明消費者
(5)綁定消費者到channel,監聽處理消息
(6)關閉連接

4.2 訂閱成功后,我們打開mq的管理地址可以看到,有兩個queue綁定到exchange上了:


注意:如果我們的exchange沒有消費者訂閱,發布的消息將不會被保存到任何隊列,直接丟失了;
參考鏈接:https://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM