RabbitMQ消息隊列(九)-通過Headers模式分發消息(.Net Core版)


Headers類型的exchange使用的比較少,以至於官方文檔貌似都沒提到,它是忽略routingKey的一種路由方式。是使用Headers來匹配的。Headers是一個鍵值對,可以定義成Hashtable。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者匹配的話,則對應的隊列就可以收到消息。 

匹配有兩種方式all和any。這兩種方式是在接收端必須要用鍵值”x-mactch”來定義。all代表定義的多個鍵值對都要滿足,any代表只要滿足一個就可以。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型。 

那在.Net Core中怎么應用呢? 

 

headers也是一種交換機類型,但是在rabbitmq官網中的教程中並沒有說到。資料也很少,但是找一找總會有的。

headers與direct的模式不同,不是使用routingkey去做綁定。而是通過消息headers的鍵值對匹配

發布者  -- >  headersexchange  -->  (user:  “小明 ”) binding  --> queue

也就是說 user: 小明 替代了之前的routingkey。在做綁定的時候有兩種匹配方式供選擇。x-match (all/any)

意思就是鍵值對中所有的項都要匹配與只要有一個匹配就可以。下面就可以動手寫代碼了

新建HeadersProduct用來發布新消息:

using System;
using System.Collections.Generic;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace HeadersProduct
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchangeHeaders";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            using (IConnection connection=factory.CreateConnection())
            {
                using (IModel channel=connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: "headers", durable: true, autoDelete: false, arguments: null);

                    IBasicProperties properties = channel.CreateBasicProperties();
                    properties.Headers = new Dictionary<String, Object>()
                    {
                        {"user","wyt" },
                        {"password","wyt"}
                    };

                    Byte[] body = Encoding.UTF8.GetBytes("Hello World");

                    channel.BasicPublish(exchange: exchangeName, routingKey: String.Empty, basicProperties: properties, body: body);
                }
            }

            Console.Write("發布成功!");

            Console.ReadKey();
        }
    }
}
View Code

新建HeadersConsumerA來以正確的headers接收消息:

using System;
using System.Collections.Generic;
using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace HeadersConsumerA
{
    class Program
    {
        static void Main(string[] args)
        {



            String exchangeName = "wytExchangeHeaders";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            bool flag = true;
            string pattern = "";
            while (flag)
            {
                Console.WriteLine("請選擇headers匹配模式  1(any)/2(all)");
                pattern = Console.ReadLine();
                if (pattern == "1" || pattern == "2")
                    flag = false;
                else
                    Console.Write("請做出正確的選擇");
            }
            //根據聲明使用的隊列
            var headersType = pattern == "1" ? "any" : "all";

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);

                    String queueName = channel.QueueDeclare().QueueName;

                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: new Dictionary<String, Object>
                    {
                        {"x-math",headersType },
                        {"user","wyt" },
                        {"password","wyt" }
                    });

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var msg = Encoding.UTF8.GetString(ea.Body);

                        Console.WriteLine($"{msg}");
                    };

                    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);


                    Console.ReadKey();

                }
            }
        }
    }
}
View Code

新建HeadersConsumerB來以錯誤的headers接收消息:

using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Collections.Generic;
using System.Text;

namespace HeadersConsumerB
{
    class Program
    {
        static void Main(string[] args)
        {
            String exchangeName = "wytExchangeHeaders";

            ConnectionFactory factory = new ConnectionFactory();
            factory.HostName = "192.168.63.129";
            factory.Port = 5672;
            factory.VirtualHost = "/wyt";
            factory.UserName = "wyt";
            factory.Password = "wyt";

            bool flag = true;
            string pattern = "";
            while (flag)
            {
                Console.WriteLine("請選擇headers匹配模式  1(any)/2(all)");
                pattern = Console.ReadLine();
                if (pattern == "1" || pattern == "2")
                    flag = false;
                else
                    Console.Write("請做出正確的選擇");
            }
            //根據聲明使用的隊列
            var headersType = pattern == "1" ? "any" : "all";

            using (IConnection connection = factory.CreateConnection())
            {
                using (IModel channel = connection.CreateModel())
                {
                    channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null);

                    String queueName = channel.QueueDeclare().QueueName;

                    channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: new Dictionary<String, Object>
                    {
                        {"x-math",headersType },
                        {"user","xxx" },
                        {"password","xxx" }
                    });

                    EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var msg = Encoding.UTF8.GetString(ea.Body);

                        Console.WriteLine($"{msg}");
                    };

                    channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);


                    Console.ReadKey();

                }
            }

        }
    }
}
View Code

運行結果

一個可以接收到消息,另一個由於headers是不匹配的,所有接收不到消息

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM