C#利用RabbitMQ實現點對點消息傳輸


RabbitMQ做為消息代理,負責接收和轉發消息,可以將RabbitMQ比喻為一個郵筒、一個郵局和一個郵遞員。本文主要以一個簡單的小例子,簡述RabbitMQ實現消息傳輸的相關內容,僅供學習分享使用,如有不足之處,還請指正。

消息隊列模型

所有 MQ 產品從模型抽象上來說都是一樣的過程:
消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,最后將消息發送到監聽的消費者。

RabbitMQ設置

RabbitMQ是通過交換機將消息轉發到對應隊列,所以隊列需要和交換機進行綁定。本例將隊列綁定到默認的amq.direct交換機,並設置Routing key,如下圖所示:

RabbitMQ動態庫安裝

通過NuGet包管理器進行安裝RabbitMQ.Client,如下所示:

RabbitMQ.Client相關知識點

  • ConnectionFactory:構造一個實例,主要創建連接。
  • IConnection:表示一個基於AMQP協議的連接。
  • IModel:表示一個RabbitMQ通道,可用於聲明一個隊列,然后開始消費。
  • EventingBasicConsumer:基於獨立事件監聽的基礎消費者,可以監聽並接收消息。
  • 生產者基本步驟:1. 創建連接 2. 基於連接創建通道 3. 基於通道聲明隊列,4. 開始生產並發布消息
  • 消費者基本步驟:1. 創建連接 2. 基於連接創建通道 3. 基於通道聲明隊列,4. 創建消費者,5. 綁定通道和消費者,並開始消費

示例效果圖

本例主要有一個生產者,一個消費者,通過消息隊列進行消息轉發和接收。

生產者負責消息發送,如下圖所示:

消費者負責消息接收,如下圖所示:

核心代碼

代碼結構:主要包括生產者,消費者,公共基礎代碼,如下所示:

RabbitMqHelper主要創建連接,如下所示:

 1     public class RabbitMqHelper
 2     {
 3         
 4         /// <summary>
 5         /// 創建連接
 6         /// </summary>
 7         /// <returns></returns>
 8         public IConnection GetConnection()
 9         {
10             try
11             {
12                 var factory = new ConnectionFactory()
13                 {
14                     HostName = "127.0.0.1",
15                     Port = 5672,
16                     UserName = "guest",
17                     Password = "guest",
18                     VirtualHost = "/ShortMsgHost"
19                 };
20                 var conn = factory.CreateConnection();
21                 return conn;
22             }
23             catch (Exception ex) {
24                 throw ex;
25             }
26         }
27 
28    
29 
30     }

RabbmitMqSendHelper用於發送消息,如下所示:

 1     public class RabbmitMqSendHelper : RabbitMqHelper
 2     {
 3         /// <summary>
 4         /// 發送消息
 5         /// </summary>
 6         /// <param name="msg"></param>
 7         /// <returns></returns>
 8         public bool SendMsg(string msg)
 9         {
10             try
11             {
12                 using (var conn = GetConnection())
13                 {
14                     using (var channel = conn.CreateModel())
15                     {
16                         channel.QueueDeclare(queue: "ShortMsgQueue",
17                                      durable: true,
18                                      exclusive: false,
19                                      autoDelete: false,
20                                      arguments: null);
21                         var body = Encoding.UTF8.GetBytes(msg);
22 
23                         channel.BasicPublish(exchange: "amq.direct",
24                                              routingKey: "ShortMsgKey",
25                                              basicProperties: null,
26                                              body: body);
27 
28                         //Console.WriteLine(" [x] Sent {0}", message);
29                     };
30                 };
31                 return true;
32             }
33             catch (Exception ex)
34             {
35                 throw ex;
36             }
37         }
38     }

RabbitMqReceiveHelper主要用於接收信息,如下所示:

 1     public class RabbitMqReceiveHelper : RabbitMqHelper
 2     {
 3         public RabbitMqReceiveEventHandler OnReceiveEvent;
 4 
 5         private IConnection conn;
 6 
 7         private IModel channel;
 8 
 9         private EventingBasicConsumer consumer;
10 
11         public bool StartReceiveMsg()
12         {
13             try
14             {
15                 conn = GetConnection();
16 
17                 channel = conn.CreateModel();
18 
19                 channel.QueueDeclare(queue: "ShortMsgQueue",
20                                 durable: true,
21                                 exclusive: false,
22                                 autoDelete: false,
23                                 arguments: null);
24 
25                 consumer = new EventingBasicConsumer(channel);
26                 consumer.Received += (model, ea) =>
27                 {
28                     var body = ea.Body.ToArray();
29                     var message = Encoding.UTF8.GetString(body);
30                     //Console.WriteLine(" [x] Received {0}", message);
31                     if (OnReceiveEvent != null)
32                     {
33                         OnReceiveEvent(message);
34                     }
35                 };
36                 channel.BasicConsume(queue: "ShortMsgQueue",
37                                         autoAck: true,
38                                         consumer: consumer);
39                 return true;
40             }
41             catch (Exception ex)
42             {
43                 throw ex;
44             }
45         }
46     }

關於RabbitMQ的基礎知識介紹,可參考前幾篇博文。

備注

浣溪沙·堤上游人逐畫船

歐陽修 〔宋代〕

堤上游人逐畫船,拍堤春水四垂天。綠楊樓外出秋千。
白發戴花君莫笑,六幺催拍盞頻傳。人生何處似尊前! 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM