快速掌握RabbitMQ(二)——四種Exchange介紹及代碼演示


  在上一篇的最后,編寫了一個C#驅動RabbitMQ的簡單栗子,了解了C#驅動RabbitMQ的基本用法。本章介紹RabbitMQ的四種Exchange及各種Exchange的使用場景。

1 direct類型

1 direct路由規則

    上一篇最后一個栗子使用的Exchange就是direct類型的,direct類型的exchange路由規則很簡單:

  exchange在和queue進行binding時會設置routingkey(為了避免和下邊的routingKey混淆,很多時候把這里的routingKey叫做BindingKey)

channel.QueueBind(queue:"Q1", exchange:"myexchange", routingKey:"orange");

  將消息發送到Broker時會設置對應的routingkey

 channel.BasicPublish(exchange: "myexchange",routingKey: "orange", basicProperties: null, body: body);

  只有RoutingKey和BindingKey完全相同時,exchange才會把消息路由到綁定的queue中去

2 代碼示例

  我們知道了direact類型的交換機只有routingKey和bindingKey相同的時候才會進行消息路由,根據這一特點我們可以通過routingKey將消息路由到不同的queue中。如在進行日志處理時,需求是所有的日志都保存到文本文件,出現錯誤日志時則還需要短信通知以便及時處理。我們可以創建兩個隊列:只接收錯誤日志的log_error隊列和接收所有日志信息的log_all隊列。消費者C1處理log_error隊列中消息,將這些消息通過短信通知管理員,消費者C2處理log_all隊列的信息,將這些信息記錄到文本文件。

  生產者用於發送日志消息,代碼如下:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "myexchange",
                                            type: ExchangeType.Direct,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //聲明兩個隊列,log_all保存所有日志,log_error保存error類型日志
                    channel.QueueDeclare(queue: "log_all",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    channel.QueueDeclare(queue: "log_error",
                                        durable: true,
                                        exclusive: false,
                                        autoDelete: false,
                                        arguments: null);
                    //綁定所有日志類型到log_all隊列
                    string[] logtypes = new string[] { "debug", "info", "warn", "error" };
                    foreach (string item in logtypes)
                    {
                        channel.QueueBind(queue: "log_all",
                                exchange: "myexchange",
                                routingKey: item);
                    }
                    //綁定錯誤日志到log_all隊列
                    channel.QueueBind(queue: "log_error",
                                exchange: "myexchange",
                                routingKey: "error");
                    //准備100條測試日志信息
                    List<LogMsg> msgList = new List<LogMsg>();
                    for (int i = 1; i < 100; i++)
                    {
                        if (i%4==0)
                        {
                            msgList.Add(new LogMsg() { LogType = "info", Msg = Encoding.UTF8.GetBytes($"info第{i}條信息") });
                        }
                        if (i % 4 == 1)
                        {
                            msgList.Add(new LogMsg() { LogType = "debug", Msg = Encoding.UTF8.GetBytes($"debug第{i}條信息") });
                        }
                        if (i % 4 == 2)
                        {
                            msgList.Add(new LogMsg() { LogType = "warn", Msg = Encoding.UTF8.GetBytes($"warn第{i}條信息") });
                        }
                        if (i % 4 == 3)
                        {
                            msgList.Add(new LogMsg() { LogType = "error", Msg = Encoding.UTF8.GetBytes($"error第{i}條信息") });
                        }
                    }
                    Console.WriteLine("生產者發送100條日志信息");
                    //發送日志信息
                    foreach (var item in msgList)
                    {
                        channel.BasicPublish(exchange: "myexchange",
                                            routingKey: item.LogType,
                                            basicProperties: null,
                                            body: item.Msg);
                    }
                }
            }
            Console.ReadKey();
        }
    }
View Code

  消費者C1用於處理log_error隊列中的消息,錯誤消息進行短信通知,代碼如下:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "myexchange",
                                            type: ExchangeType.Direct,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //聲明隊列queue
                    channel.QueueDeclare(queue: "log_all",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);

                    //綁定
                    channel.QueueBind(queue: "log_error",
                            exchange: "myexchange",
                            routingKey: "error");

                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        //只是為了演示,並沒有存入文本文件
                        Console.WriteLine($"接收成功!【{message}】,發送短信通知");
                    };
                    Console.WriteLine("消費者C1【接收錯誤日志,發送短信通知】准備就緒....");
                    //處理消息
                    channel.BasicConsume(queue: "log_error",
                                         autoAck: true,
                                         consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
View Code

  消費者C2用於處理log_all隊列中的消息,所有消息記錄到文本文件中,代碼如下:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "myexchange",
                                            type: ExchangeType.Direct,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //聲明隊列queue
                    channel.QueueDeclare(queue: "log_all",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);

                    //綁定
                    string[] logtypes = new string[] { "debug", "info", "warn", "error" };
                    foreach (string item in logtypes)
                    {
                        channel.QueueBind(queue: "log_all",
                                exchange: "myexchange",
                                routingKey: item);
                    }
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        //只是為了演示,並沒有存入文本文件
                        Console.WriteLine($"接收成功!【{message}】,存入文本文件");
                    };
                    Console.WriteLine("消費者C2【接收所有日志信息,存入文本文件】准備就緒....");
                    //處理消息
                    channel.BasicConsume(queue: "log_all",
                                         autoAck: true,
                                         consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
View Code

  運行這三個項目,執行結果如下:

2 fanout類型

1 fanout路由規則

  fanout類型的exchange路由規則是最簡單的,交換機會把消息廣播到與該Exchange綁定的所有queue中,即所有和該exchange綁定的隊列都會收到消息。fanout類型exchange和隊列綁定時不需要指定routingKey,即使指定了routingKey也會被忽略掉。路由結構如下:

  fanout類型交換機主要用於發布/訂閱的一些場景,如用戶注冊了我們的網站后,我們通過短信和郵件兩種方式通知用戶

2 代碼示例

   這里通過代碼簡單演示將消息同時使用短信和郵件兩種方式通知用戶的流程。首先聲明一個fanout類型的exchange,然后聲明兩個隊列 SMSqueue和EMAILqueue,這兩個隊列都和這個exchange綁定。消費者1處理EMAILqueue的消息,通過郵件方式發送通知;消費者2處理SMSqueue的消息通過短信方式發送通知。

  生產者發送信息,代碼如下:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //第一步:創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "myfanoutexchange",
                                            type: ExchangeType.Fanout,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //聲明SMSqueue隊列,用於短信通知
                    channel.QueueDeclare(queue: "SMSqueue",
                                        durable: true,
                                        exclusive: false,
                                        autoDelete: false,
                                        arguments: null);
                    //聲明隊列,Email隊列,用於郵件通知
                    channel.QueueDeclare(queue: "EMAILqueue",
                                        durable: true,
                                        exclusive: false,
                                        autoDelete: false,
                                        arguments: null);

                    //綁定exchange和queue
                    channel.QueueBind(queue: "SMSqueue", exchange: "myfanoutexchange", routingKey: string.Empty,arguments:null);
                    channel.QueueBind(queue: "EMAILqueue", exchange: "myfanoutexchange", routingKey: string.Empty, arguments: null);
                    Console.WriteLine("生產者准備就緒....");
                   
                    string message = "";
                    //第六步:發送消息
                    //在控制台輸入消息,按enter鍵發送消息
                    while (!message.Equals("quit", StringComparison.CurrentCultureIgnoreCase))
                    {
                        message = Console.ReadLine();
                        var body = Encoding.UTF8.GetBytes(message);
                        //基本發布
                        channel.BasicPublish(exchange: "myfanoutexchange",
                                             routingKey: string.Empty,
                                             basicProperties: null,
                                             body: body);
                        Console.WriteLine($"消息【{message}】已發送到隊列");
                    }
                }
            }
            Console.ReadKey();
        }
View Code

  消費者1將EMAILqueue的消息通過郵件方式發送通知,代碼如下:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "myfanoutexchange",
                                            type: ExchangeType.Fanout,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //聲明隊列queue
                    channel.QueueDeclare(queue: "EMAILqueue",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);

                    //綁定exchange和queue
                    channel.QueueBind(queue: "EMAILqueue", exchange: "myfanoutexchange", routingKey: string.Empty, arguments: null);
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        //只是為了演示,並沒有存入文本文件
                        Console.WriteLine($"接收成功!【{message}】,郵件通知");
                    };
                    Console.WriteLine("郵件通知服務准備就緒...");
                    //處理消息
                    channel.BasicConsume(queue: "EMAILqueue",
                                         autoAck: true,
                                         consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
View Code

  消費者2將SMSqueue的消息通過短信方式發送通知,代碼如下:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "myfanoutexchange",
                                            type: ExchangeType.Fanout,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //聲明隊列queue
                    channel.QueueDeclare(queue: "SMSqueue",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);

                    //綁定exchange和queue
                    channel.QueueBind(queue: "SMSqueue", exchange: "myfanoutexchange", routingKey: string.Empty,arguments:null);
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        //只是為了演示,並沒有存入文本文件
                        Console.WriteLine($"接收成功!【{message}】,短信通知");
                    };
                    Console.WriteLine("短信通知服務准備就緒...");
                    //處理消息
                    channel.BasicConsume(queue: "myfanoutqueue",
                                         autoAck: true,
                                         consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
View Code

  啟動這三個應用程序,執行結果如下:

3 topic類型

1 topic路由規則

  topic類型exchange的路由規則也是基於routingKey和bindingKey的,其路由過程和direct類型基本一致,兩者的區別在於direct類型的exchange要求routingKey和bindingKey必須相同才進行將消息路由到綁定的queue中,而topic類型的bindingKey是一個匹配規則,只要routingKey符合bindingKey的規則就可以將消息路由到綁定的queue中去,結構如下圖所示。注意routingKey和bindingKey的結構都是一系列由點號連接單詞的字符串,例如【aaa.bbb.ccc】。

  bindingKey的兩個特殊符號:*表示一個單詞,#表示0或多個單詞(注意是單詞,而不是字符)。如下圖,usa.news和usa.weather都和usa.#匹配,而usa.news和europe.news都和#.news匹配。

  

2 代碼實現

  這里使用代碼實現上圖中的例子,為了方便我們只定義兩個隊列:接收美國相關信息的usaQueue(bindingKey是usa.#)和接收新聞消息的newsQueue(bindingKey是#.news)。然后定義兩個消費者,消費者1處理useaQueue的消息,消費者2處理newsQueue的消息。

  生產者代碼:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "mytopicExchange",
                                            type: ExchangeType.Topic,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //聲明隊列usaQueue
                    channel.QueueDeclare(queue: "usaQueue",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    //聲明隊列newsQueue
                    channel.QueueDeclare(queue: "newsQueue",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    Console.WriteLine("生產者准備就緒....");
                    //綁定usaQueue隊列到交互機,routingKey為usa.#
                    channel.QueueBind(queue: "usaQueue", exchange: "mytopicExchange", routingKey: "usa.#", arguments: null);
                    //綁定newsQueue隊列到交互機,routingKey為#.news
                    channel.QueueBind(queue: "newsQueue", exchange: "mytopicExchange", routingKey: "#.news", arguments: null);

                   ////--------------------開始發送消息
                   //1.發送美國新聞消息
                    string message1 = "美國新聞消息:內容balabala";
                    var body1 = Encoding.UTF8.GetBytes(message1);
                    channel.BasicPublish(exchange: "mytopicExchange",
                                         routingKey: "usa.news",
                                         basicProperties: null,
                                         body: body1);
                    Console.WriteLine($"消息【{message1}】已發送到隊列");

                    //2.發送美國天氣消息
                    string message2 = "美國天氣消息:內容balabala";
                    var body2 = Encoding.UTF8.GetBytes(message2);
                    channel.BasicPublish(exchange: "mytopicExchange",
                                         routingKey: "usa.weather",
                                         basicProperties: null,
                                         body: body2);
                    Console.WriteLine($"消息【{message2}】已發送到隊列");
                    //3.發送歐洲新聞消息
                    string message3 = "歐洲新聞消息:內容balabala";
                    var body3 = Encoding.UTF8.GetBytes(message3);
                    channel.BasicPublish(exchange: "mytopicExchange",
                                         routingKey: "europe.news",
                                         basicProperties: null,
                                         body: body3);
                    Console.WriteLine($"消息【{message3}】已發送到隊列");

                    //4.發送歐洲天氣消息
                    string message4 = "歐洲天氣消息:內容balabala";
                    var body4 = Encoding.UTF8.GetBytes(message4);
                    //基本發布
                    channel.BasicPublish(exchange: "mytopicExchange",
                                         routingKey: "europe.weather",
                                         basicProperties: null,
                                         body: body4);
                    Console.WriteLine($"消息【{message4}】已發送到隊列");
                }
            }
            Console.ReadKey();
        }
View Code

  消費者1代碼,只處理usaQueue中的消息:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "mytopicExchange",
                                            type: ExchangeType.Topic,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //聲明隊列queue
                    channel.QueueDeclare(queue: "usaQueue",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    Console.WriteLine("usaQueue消費者准備就緒....");
                    //綁定usaQueue隊列到交互機
                    channel.QueueBind(queue: "usaQueue", exchange: "mytopicExchange", routingKey: "usa.#", arguments: null);
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"接收成功!【{message}】");
                    };
                    //處理消息
                    channel.BasicConsume(queue: "usaQueue",
                                         autoAck: true,
                                         consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
    
View Code

  消費者2代碼,只處理newsQueue中的消息:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "mytopicExchange",
                                            type: ExchangeType.Topic,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //聲明隊列queue
                    channel.QueueDeclare(queue: "newsQueue",
                                         durable: true,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                    Console.WriteLine("newsQueue消費者准備就緒....");
                    //綁定usaQueue隊列到交互機
                    channel.QueueBind(queue: "newsQueue", exchange: "mytopicExchange", routingKey: "#.news", arguments: null);
                    //定義消費者                                      
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        Console.WriteLine($"接收成功!【{message}】");
                    };
                    //處理消息
                    channel.BasicConsume(queue: "newsQueue",
                                         autoAck: true,
                                         consumer: consumer);
                    Console.ReadLine();
                }
            }
        }
View Code

  生成者發送的四條消息中,消息1的routingKey為usa.news,同時符合usaQueue的bindingKey(usa.#)和newsQueue的bindingKey(#.news),所以消息1同時路由到兩個隊列中;消息2的routingKey為usa.weather只符合usa.#,發送到usaQueue;消息的rouKey為europe.news,只符合#.news,發送到newsQueue中;消息4的routingKey為europe.weahter,和兩個隊列的bindingKey都不符合,所以被丟棄。執行這三個Console應用程序,結果如下:

 

  一點補充:topic類型交換機十分靈活,可以輕松實現direct和fanout類型交換機的功能。如果綁定隊列時所有的bindingKey都是#,則交換機和fanout類型交換機表現一致;如果所有的bindingKey都不包含*和#,則交換機和direct類型交換機表現一致。

4  header類型

1 header路由規則

  header類型路由規則和上邊的幾種exchange都不一樣,header類型exchange不是通過routingKey進行路由的,而是通過Headers。exchange在和queue進行binding時可以設置arguments:

                   channel.QueueBind(queue: "Allqueue", 
                        exchange: "myheaderExchange", 
                        routingKey: string.Empty, 
                        arguments: new Dictionary<string, object> { { "x-match","all"}, { "user","jack"}, { "pass","123"}
              
});

  將消息發送到exchange時可以設置消息的Header:

                   var props1 = channel.CreateBasicProperties();
                    props1.Headers = new Dictionary<string, object>() {
                         { "user","jack"},
                         { "pass","123"}
                    };
                    var body1 = Encoding.UTF8.GetBytes(msg1);
                    //發送消息
                    channel.BasicPublish(exchange: "myheaderExchange",
                                         routingKey: string.Empty,
                                         basicProperties: props1,
                                         body: body1);

  user和pass是普通的鍵值對,我們也可以設置其他的鍵值對。x-match是一個特殊的屬性,當x-match為all時,agumentsbasicProrperties.Headers的所有鍵值對都相等時才會路由到queue(AND關系);當x-match為any時,aguments和basicProrperties.Headers的鍵值對只要有一個相同就可以路由到queue(OR關系)。

2.代碼示例

  看一個簡單的栗子,創建兩個隊列Allqueue和Anyqueue,其中Allqueue和exchange綁定時的x-match為all,Anyqueue和exchange綁定時的x-match為any;然后發送兩條消息,發送第一條消息時basicProperties.Headers中的user和pass都和綁定隊列時的agruments的user和pass相等,發送第二條消息是兩者的pass不相等,代碼如下:

        static void Main(string[] args)
        {
            var factory = new ConnectionFactory()
            {
                //rabbitmq-server所在設備ip,這里就是本機
                HostName = "127.0.0.1",
                UserName = "wyy",//用戶名
                Password = "123321"//密碼
            };
            //創建連接connection
            using (var connection = factory.CreateConnection())
            {
                //創建通道channel
                using (var channel = connection.CreateModel())
                {
                    //聲明交換機exchang
                    channel.ExchangeDeclare(exchange: "myheaderExchange",
                                            type: ExchangeType.Headers,
                                            durable: true,
                                            autoDelete: false,
                                            arguments: null);
                    //聲明Allqueue隊列
                    channel.QueueDeclare(queue: "Allqueue",
                                        durable: true,
                                        exclusive: false,
                                        autoDelete: false,
                                        arguments: null);
                    //聲明Anyqueue隊列
                    channel.QueueDeclare(queue: "Anyqueue",
                                        durable: true,
                                        exclusive: false,
                                        autoDelete: false,
                                        arguments: null);
                    Console.WriteLine("生產者准備就緒....");
                   
                    
                    //////發送消息消息1,user和pass都相同
                    //綁定exchange和Allqueue
                    channel.QueueBind(queue: "Allqueue", 
                        exchange: "myheaderExchange", 
                        routingKey: string.Empty, 
                        arguments: new Dictionary<string, object> {
                                            { "x-match","all"},
                                            { "user","jack"},
                                            { "pass","123"}});
                    string msg1 = "user和pass都相同時發送的消息";
                    var props1 = channel.CreateBasicProperties();
                    props1.Headers = new Dictionary<string, object>() {
                         { "user","jack"},
                        { "pass","123"}
                    };
                    var body1 = Encoding.UTF8.GetBytes(msg1);
                    //基本發布
                    channel.BasicPublish(exchange: "myheaderExchange",
                                         routingKey: string.Empty,
                                         basicProperties: props1,
                                         body: body1);
                    Console.WriteLine($"消息【{msg1}】已發送到隊列");
                    
                    
                    //////發送消息消息2,user和pass不完全相同
                    //綁定exchange和Anyqueue
                    channel.QueueBind(queue: "Anyqueue", 
                        exchange: "myheaderExchange", 
                        routingKey: string.Empty, 
                        arguments: new Dictionary<string, object> {
                                            { "x-match","any"},
                                            { "user","jack"},
                                            { "pass","123"},});
                  
                    string msg2 = "user和pass不完全相同時發送的消息";
                    var props2 = channel.CreateBasicProperties();
                    props2.Headers = new Dictionary<string, object>() {
                         { "user","jack"},
                         { "pass","456"}//這里的pass和BindQueue方法的中argumens中的pass不相同
                    };
                    var body2 = Encoding.UTF8.GetBytes(msg2);
                    //基本發布
                    channel.BasicPublish(exchange: "myheaderExchange",
                                         routingKey: string.Empty,
                                         basicProperties: props2,
                                         body: body2);
                    Console.WriteLine($"消息【{msg2}】已發送到隊列");

                }
            }
            Console.ReadKey();
        }
    }
View Code

  執行程序,打開WebUI管理界面,結果如下,我們看到只有user和pass都相等時消息才會路由到Allqueue;user和pass只要有一個相等就會路由到Anyqueue

5 小結

  RabbitMQ的交換機(exchange)的作用是路由消息,我們可以根據應用場景的不同選擇合適的交換機。如果需要精准路由到隊列,或者對消息進行單一維度分類(只對日志的嚴重程度這一維度進行分類)可以使用direct類型交換機;如果需要廣播消息,可以使用fanout類型的交換機;如果對消息進行多維度分類(如例子中按照地區和消息內容類型兩個維度進行分類)使用topic類型的交換機;如果消息歸類的邏輯包含了較多的AND/OR邏輯判斷可以使用header類型交換機(開發中很少用到Header類型,官網上關於Header類型的介紹也不多)。

 

【參考文章】

1. https://www.cnblogs.com/zhangweizhong/p/5713874.html

2.https://blog.csdn.net/ctwy291314/article/details/83147194


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM