理清路由機制是了解RabbitMQ來龍去脈的關鍵。在前面的例子中我們常常遇見這三個概念:exchange,routingKey 和 queue。真正地消息傳輸流程是消息先到exchange,然后exchange根據對應的routingKey放入queue,如果routingKey不匹配則丟棄。網上網友的一張圖很好的展示了這個流程:
0.9 版本的AMQP協議的exchange有如下4中類型:fanout,direct,topic 和 headers。RabbitMQ服務會在啟動以后預先建立4個exchange,分別對應於4中類型:
默認的exchange
如果用空字符串去申明一個exchange,那么系統就會使用"amq.direct"這個exchange。我們在創建一個queue的時候,默認的都會有一個和新建queue同名的routingKey綁定到這個默認的exchange上去。就像我們在發送和接收例子中,發送者的發送代碼:
channel.BasicPublish("", "TaskQueue", properties, bytes);
因為在第一個參數選擇了默認的exchange,而我們申明的隊列叫TaskQueue,所以默認的,它在新建一個也叫TaskQueue的routingKey,並綁定在默認的exchange上,導致了我們可以在第二個參數routingKey中寫TaskQueue,這樣它就會找到定義的同名的queue,並把消息放進去。
Direct Exchange
direct exchange 發送消息是要看routingKey的。舉個例子,定義了一個direct exchange 名字是X1,然后一個queue名字為Q1 用routingKey=K1 綁定到exchange X1上,當一個routeKey為 K2 的消息到達X1上,那么只有K1=K2的時候,這個消息才能到達Q1上。代碼例子如下:
channel.ExchangeDeclare("X1", "direct"); channel.QueueDeclare("Q1", true, false, false, null); channel.QueueBind("Q1", "X1", "K1"); channel.BasicPublish("X1", "K1", null, bytes);
這樣Q1才能收到消息。
如果有兩個接收程序都是用了同一個的queue和相同的routingKey去綁定direct exchange的話,分發的行為是負載均衡的,也就是說第一個是程序1收到,第二個是程序2收到,以此類推。
如果有兩個接收程序用了各自的queue,但使用相同的routingKey去綁定direct exchange的話,分發的行為是復制的,也就是說每個程序都會收到這個消息的副本。行為相當於fanout類型的exchange。
官方圖片展示direct exchange:
Fanout Exchange
fanout類型的exchange就比較好理解。就是簡單的廣播,而且是忽略routingKey的。所以只要是有queue綁定到fanout exchange上,通過這個exchange發送的消息都會被發送到那些綁定的queue中,不管你有沒有輸入routingKey。
您可以需要廣播消息或分布式系統的消息同步等場景中廣泛使用到它。
官方圖片展示:
Topic Exchange
Topic類型的exchange給與我們更大的靈活性。通過定義routingKey可以有選擇的訂閱某些消息,此時routingKey就會是一個表達式。exchange會通過匹配綁定的routingKey來決定是否要把消息放入對應的隊列中。有兩種表達式符號可以讓我們選擇:#和*。
*(星號):代表任意的一個詞。 例:*.a會匹配a.a,b.a,c.a等
#(井號):代碼任意的0個或多個詞。 例:#.a會匹配a.a,aa.a,aaa.a等
topic exchange 有時候的行為會像其他類型的exchange,比如說:
當routingKey只是有#號的時候,它的行為和fanout的行為是一樣的。
當routingKey什么的沒有,空字符串的時候,它的行為是和direct是一樣的。
要注意的是,符號代表的是詞不是字符。RabbitMQ中在表達式中詞的定義是以.(點號)分隔的。
對於一個queue綁定到exchange,是可以多次綁定的:
channel.QueueBind(queue_name, "X1", "lazy.#"); channel.QueueBind(queue_name, "X1", "*.*.rabbit");
如上面的代碼,表示這個queue即可以收lazy.開頭的,又可以收.rabbit結尾的。
大家可以通過一些測試來理解這里面的規則,但是在測試的時候需要注意,每次修改表達式后,需要重置一下RabbitMQ。因為是可以多次綁定的,所以之前你改的所有的表達式都會記錄下來,都會被嘗試匹配。
重置的命令(RabbitMQ以控制台的形式運行):
Headers Exchange
Headers類型的exchange使用的比較少,它也是忽略routingKey的一種路由方式。是使用Headers來匹配的。Headers是一個鍵值對,可以定義成Hashtable。發送者在發送的時候定義一些鍵值對,接收者也可以再綁定時候傳入一些鍵值對,兩者匹配的話,則對應的隊列就可以收到消息。匹配有兩種方式all和any。這兩種方式是在接收端必須要用鍵值"x-mactch"來定義。all代表定義的多個鍵值對都要滿足,而any則代碼只要滿足一個就可以了。之前的幾種exchange的routingKey都需要要字符串形式的,而headers exchange則沒有這個要求,因為鍵值對的值可以是任何類型。代碼示例如下:
發送端:
channel.ExchangeDeclare("X1", "headers");
IBasicProperties properties = channel.CreateBasicProperties();
properties.Headers = new Hashtable();
properties.Headers.Add("Key1", 123);
properties.Headers.Add("Key2", 345);
XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
MemoryStream ms = new MemoryStream();
xs.Serialize(ms, message);
byte[] bytes = ms.ToArray();
channel.BasicPublish("X1", "", properties, bytes);
接收端:
channel.ExchangeDeclare("X1", "headers");
//隨機創建一個隊列
string queue_name = channel.QueueDeclare("headerssubscriber2", true, false, false, null);
//綁定
IDictionary ht = new Hashtable();
ht.Add("x-match", "any");
ht.Add("Key1", 12345);
ht.Add("Key2", 34567);
channel.QueueBind(queue_name, "X1", "", ht);
//定義這個隊列的消費者
QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queue_name, true, consumer);
while (true)
{
BasicDeliverEventArgs ea =
(BasicDeliverEventArgs)consumer.Queue.Dequeue();
byte[] bytes = ea.Body;
XmlSerializer xs = new XmlSerializer(typeof(RequestMessage));
using (MemoryStream ms = new MemoryStream(bytes))
{
RequestMessage message = (RequestMessage)xs.Deserialize(ms);
Console.WriteLine("Receive a Message, Id:" + message.MessageId + " Message:" + message.Message);
}
}
點擊這里下載測試代碼