RabbitMQ - 常用消息隊列之:簡單隊列(一對一模式)


 介紹

 

  • Producer:消息的生產者(發送消息的程序)。
  • Queue:消息隊列,理解為一個容器,生產者向它發送消息,它把消息存儲,等待消費者消費。
  • Consumer:消息的消費者(接收消息的程序)。

  由圖所示,簡單隊列模式,一個生產者,經過一個隊列,對應一個消費者。可以看做是點對點的一種傳輸方式,相較架構模型圖,最主要的特點就是看不到 Exchange(交換機) 和 routekey(路由鍵) ,正是因為這種模式簡單,所以並不會涉及到復雜的條件分發等等,因此也不需要用戶去顯式的考慮交換機和路由鍵的問題。

  • 但是要注意,這種模式並不是生產者直接對接隊列,而是用了默認的交換機,默認的交換機會把消息發送到和 routekey 名稱相同的隊列中去,這也是我們在后面代碼中在 routekey 位置填寫了隊列名稱的原因
 .net 5.0 NuGet引用包

 RabbitMQ.Client

 簡單測試代碼:
  •  生產者實現
using RabbitMQ.Client;
using System;
using System.Text;

namespace RabbitMQTest.Producer
{
    /// <summary>
    /// RabbitMQ測試_Producer生產者
    /// </summary>
    class Program
    {
        static void Main(string[] args)
        {
            // 1、創建連接工廠
            IConnectionFactory factory = new ConnectionFactory()
            {
                UserName = "admin",
                Password = "admin",
                HostName = "192.168.1.101", //rabbitmq server ip
                Port = 5672,
                VirtualHost = "testhost" //在此連接期間要訪問的虛擬主機。
            };
            // 2、創建連接
            IConnection connection = factory.CreateConnection();
            // 3、創建通道
            IModel channel = connection.CreateModel();
            string name = "testQueue";
            // 4、聲明一個隊列
            channel.QueueDeclare(
                queue: name, //消息隊列名稱
                durable: false, //是否持久化,true持久化,隊列會保存磁盤,服務器重啟時可以保證不丟失相關信息。
                exclusive: false, //是否排他,true排他的,如果一個隊列聲明為排他隊列,該隊列僅對首次聲明它的連接可見,並在連接斷開時自動刪除。
                autoDelete: false, //是否自動刪除,true是自動刪除,自動刪除的前提是:致少有一個消費者連接到這個隊列,之后所有與這個隊列連接的消費者都斷開時,才會自動刪除。
                arguments: null); //設置隊列的一些其它參數。

            Console.WriteLine("\n RabbitMQ連接成功,請輸入消息,輸入exit退出!");
            string input;
            do
            {
                input = Console.ReadLine();
                byte[] sendBytes = Encoding.UTF8.GetBytes(input);
                //發布消息
                channel.BasicPublish("", name, null, sendBytes);
                Console.WriteLine("消息發布完畢");
            } while (input.Trim().ToLower() != "exit");
            Console.WriteLine("\n RabbitMQ測試完畢!");
            // 6、關閉通道
            channel.Close();
            // 7、關閉連接
            connection.Close();
        }
    }
}
  • queueDeclare 方法解釋
    •  參數1:queue(隊列名稱),如果隊列不存在,則自動創建。
    •  參數2:durable(隊列是否持久化),持久化可以保證服務器重啟后此隊列仍然存在。
    •  參數3:exclusive(排他隊列)即是否獨占隊列,如果此項為 true,該隊列僅對首次申明它的連接可見,並在連接斷開時自動刪除。
    •  參數4:autoDelete(自動刪除),最后一個消費者將消息消費完畢后,自動刪除隊列。
    •  參數5:arguments(攜帶附加屬性)。
  • basicPublish 方法解釋
    •  參數1:exchange(交換機名稱)。
    •  參數2:routingKey(路由key),此處填寫隊列名,可理解為把消息發送到和 routekey 名稱相同的隊列中去。
    •  參數3:props(消息的控制狀態),可以在此處控制消息的持久化。
      • 參數為:MessageProperties.PERSISTENT_TEXT_PLAIN
    • 參數4:body(消息主體),類型是一個字節數組,要轉一下類型。

通過工具關閉channel和釋放連接:先關閉通道,再釋放連接。

  • 消費者實現
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
using System.Threading;

namespace RabbitMQTest.Consumer
{
    /// <summary>
    /// RabbitMQ測試_Consumer消費者
    /// </summary>
    class Program
    {
        static void Main(string[] args)
        {
            // 1、創建連接工廠
            ConnectionFactory factory = new ConnectionFactory()
            { 
                UserName = "admin", //用戶名
                Password = "admin", //密碼
                HostName = "192.168.1.101", //rabbitmq server ip
                Port = 5672, //端口號
                VirtualHost = "testhost" //在此連接期間要訪問的虛擬主機。

            };
            // 2、創建連接
            IConnection connection = factory.CreateConnection();
            // 3、創建通道
            IModel channel = connection.CreateModel();
            // 4、事件基本消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
            string name = "testQueue";
            // 5、接收到消息事件
            consumer.Received += (ch, ea) =>
              {
                  string message = Encoding.Default.GetString(ea.Body.ToArray());
                  Console.WriteLine($"接受到消息:{message}");
                  Console.WriteLine($"收到該消息[{ea.DeliveryTag}] 延遲1s發送回執!");
                  Thread.Sleep(1000);
                  // 6、確認該消費已被消費
                  channel.BasicAck(ea.DeliveryTag, false);
                  Console.WriteLine($"已發送回執{ea.DeliveryTag}");
              };
            // 7、啟動消費者 設置為手動應答消息
            channel.BasicConsume(name, false, consumer);

            Console.WriteLine("消費者已啟動");
            Console.ReadKey();
            channel.Close();
            connection.Close();
        }
    }
}
  • basicConsume 方法解釋
    • 參數1:queue(隊列名稱),即消費哪個隊列的消息 。
    • 參數2:autoAck(自動應答)開始消息的自動確認機制,只要消費了就從隊列刪除消息。
    • 參數3:callback(消費時的回調接口),callback 的類型是 Consumer 這里使用了 DefaultConsumer 就是 Consumer 的一個實現類。其中重寫 handleDelivery 方法,就可以獲取到消費的數據內容了,這里主要使用了其中的 body,即查看消息主體,其他三個參數暫時還沒用到,有興趣可以先打印輸出一下,能先有個大概的了解。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM