RabbitMQ做為消息代理,負責接收和轉發消息,可以將RabbitMQ比喻為一個郵筒、一個郵局和一個郵遞員。本文主要以一個簡單的小例子,簡述RabbitMQ實現消息傳輸的相關內容,僅供學習分享使用,如有不足之處,還請指正。
消息隊列模型
所有 MQ 產品從模型抽象上來說都是一樣的過程:
消費者(consumer)訂閱某個隊列。生產者(producer)創建消息,然后發布到隊列(queue)中,最后將消息發送到監聽的消費者。
RabbitMQ設置
RabbitMQ是通過交換機將消息轉發到對應隊列,所以隊列需要和交換機進行綁定。本例將隊列綁定到默認的amq.direct交換機,並設置Routing key,如下圖所示:
RabbitMQ動態庫安裝
通過NuGet包管理器進行安裝RabbitMQ.Client,如下所示:
RabbitMQ.Client相關知識點
- ConnectionFactory:構造一個實例,主要創建連接。
- IConnection:表示一個基於AMQP協議的連接。
- IModel:表示一個RabbitMQ通道,可用於聲明一個隊列,然后開始消費。
- EventingBasicConsumer:基於獨立事件監聽的基礎消費者,可以監聽並接收消息。
- 生產者基本步驟:1. 創建連接 2. 基於連接創建通道 3. 基於通道聲明隊列,4. 開始生產並發布消息
- 消費者基本步驟:1. 創建連接 2. 基於連接創建通道 3. 基於通道聲明隊列,4. 創建消費者,5. 綁定通道和消費者,並開始消費
示例效果圖
本例主要有一個生產者,一個消費者,通過消息隊列進行消息轉發和接收。
生產者負責消息發送,如下圖所示:
消費者負責消息接收,如下圖所示:
核心代碼
代碼結構:主要包括生產者,消費者,公共基礎代碼,如下所示:
RabbitMqHelper主要創建連接,如下所示:
1 public class RabbitMqHelper 2 { 3 4 /// <summary> 5 /// 創建連接 6 /// </summary> 7 /// <returns></returns> 8 public IConnection GetConnection() 9 { 10 try 11 { 12 var factory = new ConnectionFactory() 13 { 14 HostName = "127.0.0.1", 15 Port = 5672, 16 UserName = "guest", 17 Password = "guest", 18 VirtualHost = "/ShortMsgHost" 19 }; 20 var conn = factory.CreateConnection(); 21 return conn; 22 } 23 catch (Exception ex) { 24 throw ex; 25 } 26 } 27 28 29 30 }
RabbmitMqSendHelper用於發送消息,如下所示:
1 public class RabbmitMqSendHelper : RabbitMqHelper 2 { 3 /// <summary> 4 /// 發送消息 5 /// </summary> 6 /// <param name="msg"></param> 7 /// <returns></returns> 8 public bool SendMsg(string msg) 9 { 10 try 11 { 12 using (var conn = GetConnection()) 13 { 14 using (var channel = conn.CreateModel()) 15 { 16 channel.QueueDeclare(queue: "ShortMsgQueue", 17 durable: true, 18 exclusive: false, 19 autoDelete: false, 20 arguments: null); 21 var body = Encoding.UTF8.GetBytes(msg); 22 23 channel.BasicPublish(exchange: "amq.direct", 24 routingKey: "ShortMsgKey", 25 basicProperties: null, 26 body: body); 27 28 //Console.WriteLine(" [x] Sent {0}", message); 29 }; 30 }; 31 return true; 32 } 33 catch (Exception ex) 34 { 35 throw ex; 36 } 37 } 38 }
RabbitMqReceiveHelper主要用於接收信息,如下所示:
1 public class RabbitMqReceiveHelper : RabbitMqHelper 2 { 3 public RabbitMqReceiveEventHandler OnReceiveEvent; 4 5 private IConnection conn; 6 7 private IModel channel; 8 9 private EventingBasicConsumer consumer; 10 11 public bool StartReceiveMsg() 12 { 13 try 14 { 15 conn = GetConnection(); 16 17 channel = conn.CreateModel(); 18 19 channel.QueueDeclare(queue: "ShortMsgQueue", 20 durable: true, 21 exclusive: false, 22 autoDelete: false, 23 arguments: null); 24 25 consumer = new EventingBasicConsumer(channel); 26 consumer.Received += (model, ea) => 27 { 28 var body = ea.Body.ToArray(); 29 var message = Encoding.UTF8.GetString(body); 30 //Console.WriteLine(" [x] Received {0}", message); 31 if (OnReceiveEvent != null) 32 { 33 OnReceiveEvent(message); 34 } 35 }; 36 channel.BasicConsume(queue: "ShortMsgQueue", 37 autoAck: true, 38 consumer: consumer); 39 return true; 40 } 41 catch (Exception ex) 42 { 43 throw ex; 44 } 45 } 46 }
關於RabbitMQ的基礎知識介紹,可參考前幾篇博文。
備注
浣溪沙·堤上游人逐畫船
歐陽修 〔宋代〕
堤上游人逐畫船,拍堤春水四垂天。綠楊樓外出秋千。
白發戴花君莫笑,六幺催拍盞頻傳。人生何處似尊前!
白發戴花君莫笑,六幺催拍盞頻傳。人生何處似尊前!