C#利用RabbitMQ實現消息訂閱與發布


在消息隊列模型中,如何將消息廣播到所有的消費者,這種模式成為“發布/訂閱”。本文主要以一個簡單的小例子,簡述通過fanout交換機,實現消息的發布與訂閱,僅供學習分享使用,如有不足之處,還請指正。

Fanout交換機模型

扇形交換機,采用廣播模式,根據綁定的交換機,路由到與之對應的所有隊列。一個發送到交換機的消息都會被轉發到與該交換機綁定的所有隊列上。很像子網廣播,每台子網內的主機都獲得了一份復制的消息。Fanout交換機轉發消息是最快的。

RabbitMQ控制台操作

新增兩個隊列

在同一個Virtual host下新增兩個隊列Q1,Q2,如下圖所示:

綁定fanout交換機

將兩個隊列綁定到系統默認的fanout交換機,如下所示:

示例效果圖

生產者,采用Fanout類型交換機發布消息,如下圖所示:

 

 當生產者發布 一條消息時,Q1,Q2兩個隊列均會收到,如下圖所示:

當啟動消費者后,兩個消費者,均會訂閱到相關消息,如下圖所示:

核心代碼

消息發布

建立連接后,將通道聲明類型為Fanout的交換機,如下所示:

 1     /// <summary>
 2     /// fanout類型交換機,發送消息
 3     /// </summary>
 4     public class RabbitMqFanoutSendHelper : RabbitMqHelper {
 5         /// <summary>
 6         /// 發送消息
 7         /// </summary>
 8         /// <param name="msg"></param>
 9         /// <returns></returns>
10         public bool SendMsg(string msg)
11         {
12             try
13             {
14                 using (var conn = GetConnection("/Alan.hsiang"))
15                 {
16                     using (var channel = conn.CreateModel())
17                     {
18                         channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
19                         
20                         var body = Encoding.UTF8.GetBytes(msg);
21 
22                         channel.BasicPublish(exchange: "amq.fanout",
23                                              routingKey: "",
24                                              basicProperties: null,
25                                              body: body);
26 
27                         //Console.WriteLine(" [x] Sent {0}", message);
28                     };
29                 };
30                 return true;
31             }
32             catch (Exception ex)
33             {
34                 throw ex;
35             }
36         }
37     }

消息訂閱

建立連接后,通道聲明類型為Fanout的交換機,並綁定隊列進行訂閱,如下所示:

 1    /// <summary>
 2     /// 扇形交換機接收消息
 3     /// </summary>
 4     public class RabbitMqFanoutReceiveHelper : RabbitMqHelper
 5     {
 6         public RabbitMqReceiveEventHandler OnReceiveEvent;
 7 
 8         private IConnection conn;
 9 
10         private IModel channel;
11 
12         private EventingBasicConsumer consumer;
13 
14         public bool StartReceiveMsg(string queueName)
15         {
16             try
17             {
18                 conn = GetConnection("/Alan.hsiang");
19 
20                 channel = conn.CreateModel();
21                 channel.ExchangeDeclare(exchange: "amq.fanout", type: ExchangeType.Fanout,durable:true);
22                 //此處隨機取出交換機下的隊列
23                 //var queueName = channel.QueueDeclare().QueueName;
24                 channel.QueueBind(queue: queueName, exchange: "amq.fanout", routingKey: "");
25                 consumer = new EventingBasicConsumer(channel);
26                 consumer.Received += (model, ea) =>
27                 {
28                     var body = ea.Body.ToArray();
29                     var message = Encoding.UTF8.GetString(body);
30                     //Console.WriteLine(" [x] Received {0}", message);
31                     if (OnReceiveEvent != null)
32                     {
33                         OnReceiveEvent(queueName+"::"+message);
34                     }
35                 };
36                 channel.BasicConsume(queue: queueName,
37                                         autoAck: true,
38                                         consumer: consumer);
39                 return true;
40             }
41             catch (Exception ex)
42             {
43                 throw ex;
44             }
45         }
46     }

關於RabbitMQ的基礎知識介紹,可參考前幾篇博文。

備注

遣懷

唐代  [杜牧]

落魄江湖載酒行,楚腰纖細掌中輕。
十年一覺揚州夢,贏得青樓薄幸名。 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM