Headers類型的exchange使用的比較少,以至於官方文檔貌似都沒提到,它是忽略routingKey的一種路由方式。是使用Headers來匹配的。Headers是一個鍵值對,可以定義成Hashtable。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者匹配的話,則對應的隊列就可以收到消息。
匹配有兩種方式all和any。這兩種方式是在接收端必須要用鍵值”x-mactch”來定義。all代表定義的多個鍵值對都要滿足,any代表只要滿足一個就可以。fanout,direct,topic exchange的routingKey都需要要字符串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型。
那在.Net Core中怎么應用呢?
headers也是一種交換機類型,但是在rabbitmq官網中的教程中並沒有說到。資料也很少,但是找一找總會有的。
headers與direct的模式不同,不是使用routingkey去做綁定。而是通過消息headers的鍵值對匹配
發布者 -- > headersexchange --> (user: “小明 ”) binding --> queue
也就是說 user: 小明 替代了之前的routingkey。在做綁定的時候有兩種匹配方式供選擇。x-match (all/any)
意思就是鍵值對中所有的項都要匹配與只要有一個匹配就可以。下面就可以動手寫代碼了
新建HeadersProduct用來發布新消息:

using System; using System.Collections.Generic; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace HeadersProduct { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeHeaders"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: "headers", durable: true, autoDelete: false, arguments: null); IBasicProperties properties = channel.CreateBasicProperties(); properties.Headers = new Dictionary<String, Object>() { {"user","wyt" }, {"password","wyt"} }; Byte[] body = Encoding.UTF8.GetBytes("Hello World"); channel.BasicPublish(exchange: exchangeName, routingKey: String.Empty, basicProperties: properties, body: body); } } Console.Write("發布成功!"); Console.ReadKey(); } } }
新建HeadersConsumerA來以正確的headers接收消息:

using System; using System.Collections.Generic; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace HeadersConsumerA { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeHeaders"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; bool flag = true; string pattern = ""; while (flag) { Console.WriteLine("請選擇headers匹配模式 1(any)/2(all)"); pattern = Console.ReadLine(); if (pattern == "1" || pattern == "2") flag = false; else Console.Write("請做出正確的選擇"); } //根據聲明使用的隊列 var headersType = pattern == "1" ? "any" : "all"; using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null); String queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: new Dictionary<String, Object> { {"x-math",headersType }, {"user","wyt" }, {"password","wyt" } }); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var msg = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"{msg}"); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.ReadKey(); } } } } }
新建HeadersConsumerB來以錯誤的headers接收消息:

using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Collections.Generic; using System.Text; namespace HeadersConsumerB { class Program { static void Main(string[] args) { String exchangeName = "wytExchangeHeaders"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.129"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; bool flag = true; string pattern = ""; while (flag) { Console.WriteLine("請選擇headers匹配模式 1(any)/2(all)"); pattern = Console.ReadLine(); if (pattern == "1" || pattern == "2") flag = false; else Console.Write("請做出正確的選擇"); } //根據聲明使用的隊列 var headersType = pattern == "1" ? "any" : "all"; using (IConnection connection = factory.CreateConnection()) { using (IModel channel = connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Headers, durable: true, autoDelete: false, arguments: null); String queueName = channel.QueueDeclare().QueueName; channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: new Dictionary<String, Object> { {"x-math",headersType }, {"user","xxx" }, {"password","xxx" } }); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var msg = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"{msg}"); }; channel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer); Console.ReadKey(); } } } } }
運行結果
一個可以接收到消息,另一個由於headers是不匹配的,所有接收不到消息