介紹
Producer
:消息的生產者(發送消息的程序)。Queue
:消息隊列,理解為一個容器,生產者向它發送消息,它把消息存儲,等待消費者消費。Consumer
:消息的消費者(接收消息的程序)。
由圖所示,簡單隊列模式,一個生產者,經過一個隊列,對應一個消費者。可以看做是點對點的一種傳輸方式,相較架構模型圖,最主要的特點就是看不到 Exchange(交換機) 和 routekey(路由鍵) ,正是因為這種模式簡單,所以並不會涉及到復雜的條件分發等等,因此也不需要用戶去顯式的考慮交換機和路由鍵的問題。
- 但是要注意,這種模式並不是生產者直接對接隊列,而是用了默認的交換機,默認的交換機會把消息發送到和 routekey 名稱相同的隊列中去,這也是我們在后面代碼中在 routekey 位置填寫了隊列名稱的原因
.net 5.0 NuGet引用包
RabbitMQ.Client
簡單測試代碼:
- 生產者實現
using RabbitMQ.Client; using System; using System.Text; namespace RabbitMQTest.Producer { /// <summary> /// RabbitMQ測試_Producer生產者 /// </summary> class Program { static void Main(string[] args) { // 1、創建連接工廠 IConnectionFactory factory = new ConnectionFactory() { UserName = "admin", Password = "admin", HostName = "192.168.1.101", //rabbitmq server ip Port = 5672, VirtualHost = "testhost" //在此連接期間要訪問的虛擬主機。 }; // 2、創建連接 IConnection connection = factory.CreateConnection(); // 3、創建通道 IModel channel = connection.CreateModel(); string name = "testQueue"; // 4、聲明一個隊列 channel.QueueDeclare( queue: name, //消息隊列名稱 durable: false, //是否持久化,true持久化,隊列會保存磁盤,服務器重啟時可以保證不丟失相關信息。 exclusive: false, //是否排他,true排他的,如果一個隊列聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。 autoDelete: false, //是否自動刪除,true是自動刪除,自動刪除的前提是:致少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。 arguments: null); //設置隊列的一些其它參數。 Console.WriteLine("\n RabbitMQ連接成功,請輸入消息,輸入exit退出!"); string input; do { input = Console.ReadLine(); byte[] sendBytes = Encoding.UTF8.GetBytes(input); //發布消息 channel.BasicPublish("", name, null, sendBytes); Console.WriteLine("消息發布完畢"); } while (input.Trim().ToLower() != "exit"); Console.WriteLine("\n RabbitMQ測試完畢!"); // 6、關閉通道 channel.Close(); // 7、關閉連接 connection.Close(); } } }
- queueDeclare 方法解釋
- 參數1:queue(隊列名稱),如果隊列不存在,則自動創建。
- 參數2:durable(隊列是否持久化),持久化可以保證服務器重啟后此隊列仍然存在。
- 參數3:exclusive(排他隊列)即是否獨占隊列,如果此項為 true,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。
- 參數4:autoDelete(自動刪除),最后一個消費者將消息消費完畢后,自動刪除隊列。
- 參數5:arguments(攜帶附加屬性)。
- basicPublish 方法解釋
- 參數1:exchange(交換機名稱)。
- 參數2:routingKey(路由key),此處填寫隊列名,可理解為把消息發送到和 routekey 名稱相同的隊列中去。
- 參數3:props(消息的控制狀態),可以在此處控制消息的持久化。
- 參數為:MessageProperties.PERSISTENT_TEXT_PLAIN
- 參數4:body(消息主體),類型是一個字節數組,要轉一下類型。
通過工具關閉channel和釋放連接:先關閉通道,再釋放連接。
- 消費者實現
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; using System.Threading; namespace RabbitMQTest.Consumer { /// <summary> /// RabbitMQ測試_Consumer消費者 /// </summary> class Program { static void Main(string[] args) { // 1、創建連接工廠 ConnectionFactory factory = new ConnectionFactory() { UserName = "admin", //用戶名 Password = "admin", //密碼 HostName = "192.168.1.101", //rabbitmq server ip Port = 5672, //端口號 VirtualHost = "testhost" //在此連接期間要訪問的虛擬主機。 }; // 2、創建連接 IConnection connection = factory.CreateConnection(); // 3、創建通道 IModel channel = connection.CreateModel(); // 4、事件基本消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); string name = "testQueue"; // 5、接收到消息事件 consumer.Received += (ch, ea) => { string message = Encoding.Default.GetString(ea.Body.ToArray()); Console.WriteLine($"接受到消息:{message}"); Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲1s發送回執!"); Thread.Sleep(1000); // 6、確認該消費已被消費 channel.BasicAck(ea.DeliveryTag, false); Console.WriteLine($"已發送回執{ea.DeliveryTag}"); }; // 7、啟動消費者 設置為手動應答消息 channel.BasicConsume(name, false, consumer); Console.WriteLine("消費者已啟動"); Console.ReadKey(); channel.Close(); connection.Close(); } } }
- basicConsume 方法解釋
- 參數1:queue(隊列名稱),即消費哪個隊列的消息 。
- 參數2:autoAck(自動應答)開始消息的自動確認機制,只要消費了就從隊列刪除消息。
- 參數3:callback(消費時的回調接口),callback 的類型是 Consumer 這里使用了 DefaultConsumer 就是 Consumer 的一個實現類。其中重寫 handleDelivery 方法,就可以獲取到消費的數據內容了,這里主要使用了其中的 body,即查看消息主體,其他三個參數暫時還沒用到,有興趣可以先打印輸出一下,能先有個大概的了解。