上篇文章中,我們把每個Message都是deliver(提供)到某個Consumer。在這篇文章中,我們將會將同一個Message deliver(提供)到多個Consumer中。這個模式也被成為 "publish / subscribe"。
這篇文章中,我們將創建一個日志系統,它包含兩個部分:第一個部分是發出log(Producer),第二個部分接收到並打印(Consumer)。 我們將構建兩個Consumer,第一個將log寫到物理磁盤上;第二個將log輸出的屏幕。
1. Exchanges
關於exchange的概念在《RabbitMQ消息隊列(一): Detailed Introduction 詳細介紹》中有詳細介紹。現在做一下簡單的回顧。
RabbitMQ 的Messaging Model就是Producer並不會直接發送Message到queue。實際上,Producer並不知道它發送的Message是否已經到達queue。
Producer發送的Message實際上是發到了Exchange中。Exchange的功能也很簡單:從Producer接收Message,然后投遞到queue中。Exchange需要知道如何處理Message,是把它放到一個queue中,還是放到多個queue中?這個rule是通過 Exchange 的類型定義的。

channel.ExchangeDeclare("logs", "fanout");
2. Temporary queues
截至現在,我們用的queue都是有名字的:第一個是hello,第二個是task_queue。使用有名字的queue,使得在Producer和Consumer之前共享queue成為可能。
但是對於我們將要構建的日志系統,並不需要有名字的queue。我們希望得到所有的log,而不是它們中間的一部分。而且我們只對當前的log感興趣。為了實現這個目標,我們需要兩件事情:1) 每當Consumer連接時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在聲明queue時不指定名字,那么RabbitMQ會隨機為我們選擇這個名字。基本上都是這個樣子: amq.gen-JzTY20BRgKO-HjmUJj0wLg。
2)當Consumer關閉連接時,這個queue要被deleted。可以加個exclusive的參數。方法:NET好像不用設置exclusive,Consumer關閉之后會自動刪掉Queue。
channel.QueueDeclare(queueName, durable, exclusive, autoDelete, null);
3. Bindings綁定

方法:
取名字:
channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");//廣播
QueueDeclareOk queueOk = channel.QueueDeclare();
var queueName = queueOk.QueueName;
綁定:
channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//不需要指定routing key,設置了fanout,指了也沒有用.
現在logs的exchange就將它的Message附加到我們創建的queue了。
1 static void Main(string[] args) 2 { 3 var factory = new ConnectionFactory() { HostName = "localhost" }; 4 using (var connection = factory.CreateConnection()) 5 { 6 using (var channel = connection.CreateModel()) 7 { 8 const string EXCHANGE_NAME = "logs"; 9 const string ROUTING_KEY = ""; 10 channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");//廣播 11 var message = GetMessage(args); 12 var body = Encoding.UTF8.GetBytes(message); 13 channel.BasicPublish(EXCHANGE_NAME, ROUTING_KEY, null, body);//不需要指定routing key,設置了fanout,指了也沒有用. 14 Console.WriteLine(" [x] Sent {0}", message); 15 } 16 } 17 }
Consumer.cs
1 static void Main(string[] args) 2 { 3 var factory = new ConnectionFactory() { HostName = "localhost" }; 4 using (var connection = factory.CreateConnection()) 5 { 6 using (var channel = connection.CreateModel()) 7 { 8 const string EXCHANGE_NAME = "logs"; 9 const string ROUTING_KEY = ""; 10 channel.ExchangeDeclare(EXCHANGE_NAME, "fanout");//廣播 11 QueueDeclareOk queueOk = channel.QueueDeclare();//每當Consumer連接時,我們需要一個新的,空的queue。因為我們不對老的log感興趣。幸運的是,如果在聲明queue時不指定名字,那么RabbitMQ會隨機為我們選擇這個名字。 12 ////現在我們已經創建了fanout類型的exchange和沒有名字的queue(實際上是RabbitMQ幫我們取了名字)。 13 ////那exchange怎么樣知道它的Message發送到哪個queue呢?答案就是通過bindings:綁定。 14 string queueName = queueOk.QueueName;//得到RabbitMQ幫我們取了名字 15 channel.QueueBind(queueName, EXCHANGE_NAME, ROUTING_KEY);//不需要指定routing key,設置了fanout,指了也沒有用. 16 var consumer = new QueueingBasicConsumer(channel); 17 channel.BasicConsume(queueName, true, consumer); 18 Console.WriteLine(" [*] Waiting for messages." + "To exit press CTRL+C"); 19 while (true) 20 { 21 var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();//掛起的操作 22 var body = ea.Body; 23 var message = Encoding.UTF8.GetString(body); 24 Console.WriteLine(" [x] Received {0}", message); 25 } 26 } 27 } 28 }
轉:
http://www.rabbitmq.com/tutorials/tutorial-three-dotnet.html(官網)
http://blog.csdn.net/anzhsoft/article/details/19617305(翻譯)