RabbitMQ(五)——發布訂閱模式


RabbitMQ系列

RabbitMQ(一)——簡介

RabbitMQ(二)——模式類型

RabbitMQ(三)——簡單模式

RabbitMQ(四)——工作隊列模式

RabbitMQ(五)——發布訂閱模式

RabbitMQ(六)——路由模式

RabbitMQ(七)——主題模式

RabbitMQ(八)——消息確認

RabbitMQ(九)——消息持久化

RabbitMQ(十)——消息優先級

 

前言

  上一章的工作隊列模式中,生產者發布的一堆消息進入隊列,消費者接收隊列中的消息,每條消息只能發給一個消費者。

  本章要做的是吧一條消息發送給多個消費者,這種模式就是Fanout Exchange(扇形交換機)“發布/訂閱模式”,它會將消息路由給綁定到它身上的所有隊列。

  注意:該模式定義隊列時durable:false沒有存儲消息功能,如果消息發送到沒有綁定消費隊列的交換器,消息丟失,也就是說,當消息生產者發送消息時,消費者還沒有綁定此交換器,則沒有接收到消息,且接收不到了;durable:true只要沒被消費就一直存在隊列中。

 

  交換器:

  在RabbitMQ中完整的消息傳遞並不是生產者直接將消息發送到隊列,生產者甚至不不知道消息是否會進入隊列。正確的模型是生產者把消息發送給交換器(Exchange),交換器會將接受到的消息轉發到隊列中,交換器必須確定將消息發送到哪些隊列。  

  

 

  交換器類型(詳情):

      • Fanout     
      • Direct
      • Topic
      • Header

  

  fanout類型即發布訂閱模式,它會把收到的消息廣播到它綁定的隊列中。

channel.ExchangeDeclare("exchange", "fanout");

  

  之前我們發布的做法是這樣的:

channel.BasicPublish("", "simple", null, Encoding.UTF8.GetBytes(msg));

  

  前面我們並沒有聲明交換器,之所以還能把消息發送到隊列是因為用了“”空的字符串標識了默認或匿名交換器。BasicPublish方法第一個參數就是交換器,第二個參數是routekey,消息通過本交換器進入到routekey隊列。

 

  發送消息到指定交換器:

channel.BasicPublish("exchange", "simple", null, Encoding.UTF8.GetBytes(msg)); 

  

  創建好fanout交換器后,定義多個隊列,然后將交換器和隊列綁定:

channel.QueueBind("simple", "exchange", "");

  

    

 

代碼

  生產者:

static void Main(string[] args)
        {
            Console.WriteLine("FanoutServer發布服務器啟動...");

            //1.創建連接工廠
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest"
            };
            //2.創建連接
            using (var connection = factory.CreateConnection())
            //3.創建通道
            using (var channel = connection.CreateModel())
            {
                //4.創建交換器
                channel.ExchangeDeclare("exchange", "fanout");

                string msg = "";

                for (int i = 0; i < 100; i++)
                {
                    msg = $"發布消息{i}";
                    var body = Encoding.UTF8.GetBytes(msg);
                    channel.BasicPublish("exchange", "", null, body);
                    Console.WriteLine($"發布成功:{msg}");
                    Thread.Sleep(600);
                }
                Console.ReadKey();
            }
        }
View Code

 

   消費者:

static void Main(string[] args)
        {
            Console.WriteLine("FanoutClient接收客戶端啟動...");
            //1.創建連接工廠
            ConnectionFactory factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest"
            };
            //2.創建連接
            using (var connection = factory.CreateConnection())
            {
                //3.創建通道
                using (var channel = connection.CreateModel())
                {
                    //4.定義交換器
                    channel.ExchangeDeclare("exchange", "fanout");
                    //5.創建匿名隊列,綁定交換器
                    //var queueName = channel.QueueDeclare("simple");
                    var queueName = channel.QueueDeclare().QueueName;
                    channel.QueueBind(queueName, "exchange", "");

                    //6.創建消費者
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                      {
                          //接收消息
                          var body = Encoding.UTF8.GetString(ea.Body.ToArray());
                          Console.WriteLine($"接收消息:{body.ToString()}");
                      };
                    //7.消費消息
                    channel.BasicConsume(queueName, true, consumer);

                    Console.ReadKey();

                }
            }
        }
View Code

 

 生產者創建完通道后,創建一個交換器,給它命名“exchange”以及設定“fanout”類型,最后發布消息到此交換器,與前面的模式不同的是不用設定隊列參數。

                                                                                                                                                                                                  

消費者創建完通道后,創建一個交換器,與生產者的交換器一致,定義一個隊列(可以指定,或者使用匿名),然后將此隊列與交換器綁定,消息會在此隊列中消費,最后創建消費者進行消費。

 

效果

    先啟動2個消費者,再啟動生產者,消息完全接受到。

    

    先啟動生產者,再分別啟動消費者,只能接受到消費者啟動之后生產發送的消息

 

發布訂閱模式就先到這,有什么不對的地方望斧正~

 

附上Demo地址:https://github.com/1164887865/RabbitMQDemo

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM