RabbitMQ(六)——路由模式


RabbitMQ系列

RabbitMQ(一)——簡介

RabbitMQ(二)——模式類型

RabbitMQ(三)——簡單模式

RabbitMQ(四)——工作隊列模式

RabbitMQ(五)——發布訂閱模式

RabbitMQ(六)——路由模式

RabbitMQ(七)——主題模式

RabbitMQ(八)——消息確認

RabbitMQ(九)——消息持久化

RabbitMQ(十)——消息優先級

 

前言

  本章講解路由模式,路由模式跟發布訂閱模式類似,然后在發布訂閱模式的基礎上改變了類型(fanout => direct),訂閱模式是分發到所有綁定到交換機的隊列,路由模式只分發到綁定在交換機上面指定路由的隊列,我們可以看一下下面這張圖:

 

以上的圖是info,error,warning為路由,表示日志的等級通過不同的路由發送到不同的隊列中,下面開始路由模式start~~~~

 

發布訂閱模式 VS 路由模式

  發布:

    發布訂閱模式的類型為fanout,而路由模式類型為direct;

 

//發布訂閱模式
channel.ExchangeDeclare(ExchangeName, "fanout");
//路由模式
channel.ExchangeDeclare(ExchangeName, "direct");

 

  

 

    訂閱發布模式發布時roukey參數為空字符串,路由模式指定了路由;

//發布訂閱模式
channel.BasicPublish(ExchangeName, "", null, body);
//路由模式
channel.BasicPublish(ExchangeName, RouteName, null, body);

 

  

 

 

  接收:

      定義交換器時,與發布時一致,發布訂閱模式為fanout,路由模式為direct;

 

//發布訂閱模式
channel.ExchangeDeclare(ExchangeName, "fanout");
//路由模式
channel.ExchangeDeclare(ExchangeName, "direct");

 

  

 

      綁定路由時,發布訂閱模式的routekey參數為空字符串,表示接受所有消息,而路由模式的routekey參數必須指定某一路由。

 

//發布訂閱模式
channel.QueueBind(queueName, ExchangeName, "");
//路由模式
channel.QueueBind(queueName, ExchangeName, routkey);

 

  

 

 

通過以上對比可以知道,路由模式與發布訂閱模式基本一致,唯一差距就是兩個參數,exchange類型和 routingKey

 

代碼

  生產者:

static void Main(string[] args)
        {
            Console.WriteLine("DirectServer發布服務器啟動...");

            //1.創建連接工廠
            var factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest"
            };
            //2.創建連接
            using (var conn = factory.CreateConnection())
            {
                //3.創建通道
                using (var channel = conn.CreateModel())
                {
                    //4.聲明交換器
                    channel.ExchangeDeclare("directExchange", "direct");

                    string msg = "";
                    for (int i = 0; i < 20; i++)
                    {
                        msg = $"發布消息{i}";
                        string ROUTE_KEY = "";
                        var body = Encoding.UTF8.GetBytes(msg);
                        //模擬向不同路由發送消息
                        if (i % 2 == 0)
                        {
                            ROUTE_KEY = "route1";
                        }
                        else
                        {
                            ROUTE_KEY = "route2";
                        }
                        //5.發布消息
                        channel.BasicPublish("directExchange", ROUTE_KEY, null, body);
                        Console.WriteLine($"向{ROUTE_KEY}發布消息成功:{msg}");

                        Thread.Sleep(1000);
                    }
                    Console.ReadKey();
                }
            }
        }
View Code

 

  消費者1:

static void Main(string[] args)
        {
            Console.WriteLine("DirectClient接收客戶端啟動...");
            //1.創建連接工廠
            var factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest"
            };
            //2.創建連接
            using (var conn = factory.CreateConnection())
            {
                //3.創建通道
                using (var channel = conn.CreateModel())
                {
                    //3.聲明隊列
                    var queue = channel.QueueDeclare().QueueName;
                    //4.綁定交換器
                    channel.QueueBind(queue, "directExchange", "route1");
                    //5.聲明消費者 消費消息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                      {
                          //接收消息
                          var body = Encoding.UTF8.GetString(ea.Body.ToArray());
                          Console.WriteLine($"接收route1消息:{body.ToString()}");
                      };
                    channel.BasicConsume(queue, true, consumer);

                    Console.ReadKey();
                }
            }
        }
View Code

  

  消費者2:

static void Main(string[] args)
        {
            Console.WriteLine("DirectClient接收客戶端啟動...");
            //1.創建連接工廠
            var factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "guest",
                Password = "guest"
            };
            //2.創建連接
            using (var conn = factory.CreateConnection())
            {
                //3.創建通道
                using (var channel = conn.CreateModel())
                {
                    //3.聲明隊列
                    var queue = channel.QueueDeclare().QueueName;
                    //4.綁定交換器
                    channel.QueueBind(queue, "directExchange", "route2");
                    //5.聲明消費者 消費消息
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, e) =>
                    {
                        byte[] message = e.Body.ToArray();
                        Console.WriteLine("接收消息:" + Encoding.UTF8.GetString(message));
                            //返回消息確認
                            channel.BasicAck(e.DeliveryTag, false);
                    };
                    //消費者開始監聽
                    channel.BasicConsume(queue, true, consumer);

                    Console.ReadKey();
                }
            }
        }
View Code

 

  可以看到生產者發布消息時指定了路由rout1/rout2,隨后消息會轉發到route1/route2路由上。接收時通過將一個隊列與交換器綁定,指定路由route1/route2,這樣就能接收到route1/route2路由上的消息。

 

效果

  消費者定義的隨機隊列

 

 

                                                

 

  向route1、route2發布消息,兩個消費者分別接收route1和route2的消息

 

效果

  路由模式中生產者發布消息時指定路由,向指定路由發送,消費者綁定交換器與路由即可接收到生產者向此路由發布的消息;

  只有將消費者發送消息的交換器、路由 與生產者指定的交換器、路由一致,消費者才能接收到生產者向指定路由的消費者發送的消息。

 

  注意:聲明路由時類型必須為direct

 

附上Demo地址:https://github.com/1164887865/RabbitMQDemo

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM