RabbitMQ的輪詢模式和公平分發(二)


目錄

本系列向大家介紹RabbitMQ的簡單用法;

1. RabbitMQ的簡單實踐

2. RabbitMQ的輪詢模式和公平分發

3. RabbitMQ的發布訂閱模式(Publish/Subscribe)

4. RabbitMQ路由模式(Routing)

5. RabbitMQ的主題(Topic)模式

一、常用的消息模式

我們在工作的使用中,經常會遇到多個消費者監聽同一個隊列的情況,模型如下圖所示:

當有多個消費者時,我們的消息會被哪個消費者消費呢,我們又該如何均衡消費者消費信息的多少呢;
主要有兩種模式:
1、輪詢模式的分發:一個消費者一條,按均分配;
2、公平分發:根據消費者的消費能力進行公平分發,處理快的處理的多,處理慢的處理的少;按勞分配;

二、輪詢模式(Round-Robin)

該模式接收消息是當有多個消費者接入時,消息的分配模式是一個消費者分配一條,直至消息消費完成;

2.1 生產者發消息到隊列

    public static void SendRoundRobinMessage()
        {
            try
            {
                var conn = GetConnection();
                var channel = conn.CreateModel();
                channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
                for(var i = 0; i < 50; i++)
                {
                    var body = Encoding.UTF8.GetBytes(i.ToString());
                    channel.BasicPublish("", QUEUE_NAME, null, body);
                }
                Console.WriteLine("消息發送完成!");
                channel.Close();
                conn.Close();
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

2.2 消費者1代碼

消費者1每處理完一次消息,線程休息1秒;

		/// <summary>
		/// 輪詢分發消費者1
		/// </summary>
		static void SimpleConsumer1()
		{
			//new rabbitMqTest.RabbitMQ.MQUtils().GetMessage();
			//創建連接工廠
			ConnectionFactory factory = new ConnectionFactory
			{
				UserName = "admin",//用戶名
				Password = "admin",//密碼
				HostName = "127.0.0.1"//rabbitmq ip
			};
			//創建連接
			var connection = factory.CreateConnection();
			//創建通道
			var channel = connection.CreateModel();
			//事件基本消費者
			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
			//接收到消息事件
			consumer.Received += (ch, ea) =>
			{
				var message = Encoding.UTF8.GetString(ea.Body);
				Console.WriteLine($"Simple Consumer1 收到消息: {message},時間{DateTime.Now}");
				Thread.Sleep(1000);
				//確認該消息已被消費
				//channel.BasicAck(ea.DeliveryTag, false);
			};
			//啟動消費者 設置為手動應答消息
			channel.BasicConsume("queue_test", true, consumer);
			Console.WriteLine("Simple Consumer1 消費者已啟動");
			Console.ReadKey();
			channel.Dispose();
			connection.Close();
		}

消費者接收消息如圖:

2.3 消費者2代碼

消費者2每處理完一次消息,線程休息3秒;

		/// <summary>
		/// 輪詢分發消費者2
		/// </summary>
		static void SimpleConsumer2()
		{
			//創建連接工廠
			ConnectionFactory factory = new ConnectionFactory
			{
				UserName = "guest",//用戶名
				Password = "guest",//密碼
				HostName = "127.0.0.1"//rabbitmq ip
			};
			//創建連接
			var connection = factory.CreateConnection();
			//創建通道
			var channel = connection.CreateModel();
			//事件基本消費者
			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
			//接收到消息事件
			consumer.Received += (ch, ea) =>
			{
				var message = Encoding.UTF8.GetString(ea.Body);
				Console.WriteLine($"Simple Consumer2 收到消息: {message},時間{DateTime.Now}");
				Thread.Sleep(3000);
				//確認該消息已被消費
				//channel.BasicAck(ea.DeliveryTag, false);
			};
			//啟動消費者 設置為手動應答消息
			channel.BasicConsume("queue_test", true, consumer);
			Console.WriteLine("Simple 2 消費者已啟動");
			Console.ReadKey();
			channel.Dispose();
			connection.Close();
		}

消費者接收消息如圖:

2.4 輪詢分發小結

消費者1和2的消息處理能力不同,但是最后處理的消息條數相同,是“按均分配”。

三、公平分發(Fair Dispatch)

由於消息接收者處理消息的能力不同,存在處理快慢的問題,我們就需要能者多勞,處理快的多處理,處理慢的少處理;

3.1 生產者發消息到隊列

代碼如下:

  public static void SendQosMessage()
        {
            try
            {
                var conn = GetConnection();
                var channel = conn.CreateModel();
                channel.QueueDeclare(QUEUE_NAME, false, false, false, null);
                channel.BasicQos(0,1,false);
                for (var i = 0; i < 50; i++)
                {
                    var body = Encoding.UTF8.GetBytes(i.ToString());
                    channel.BasicPublish("", QUEUE_NAME, null, body);
                }
                Console.WriteLine("消息發送完成!");
                channel.Close();
                conn.Close();
            }
            catch (Exception ex)
            {
                throw ex;
            }
        }

3.2 消費者1代碼如下

為了模擬處理消息的時長,每處理完一條消息讓線程休息1s

		static void SimpleConsumer1()
		{
			//new rabbitMqTest.RabbitMQ.MQUtils().GetMessage();
			//創建連接工廠
			ConnectionFactory factory = new ConnectionFactory
			{
				UserName = "admin",//用戶名
				Password = "admin",//密碼
				HostName = "127.0.0.1"//rabbitmq ip
			};

			//創建連接
			var connection = factory.CreateConnection();
			//創建通道
			var channel = connection.CreateModel();
			channel.BasicQos(0, 1, false);
			//事件基本消費者
			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

			//接收到消息事件
			consumer.Received += (ch, ea) =>
			{
				var message = Encoding.UTF8.GetString(ea.Body);
				Console.WriteLine($"Simple Consumer1 收到消息: {message},時間{DateTime.Now}");
				Thread.Sleep(1000);
				//確認該消息已被消費
				channel.BasicAck(ea.DeliveryTag, false);
			};
			//啟動消費者 設置為手動應答消息
			channel.BasicConsume("queue_test", false, consumer);
			Console.WriteLine("Simple 1 消費者已啟動");
			Console.ReadKey();
			channel.Dispose();
			connection.Close();
		}

處理的消息結果如圖:

3.3 消費者2處理消息較消費者1慢,代碼如下

為了模擬處理消息的時長,每處理完一條消息讓線程休息3s

static void SimpleConsumer2()
		{
			//new rabbitMqTest.RabbitMQ.MQUtils().GetMessage();
			//創建連接工廠
			ConnectionFactory factory = new ConnectionFactory
			{
				UserName = "admin",//用戶名
				Password = "admin",//密碼
				HostName = "127.0.0.1"//rabbitmq ip
			};

			//創建連接
			var connection = factory.CreateConnection();
			//創建通道
			var channel = connection.CreateModel();
			channel.BasicQos(0, 1, false);
			//事件基本消費者
			EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

			//接收到消息事件
			consumer.Received += (ch, ea) =>
			{
				var message = Encoding.UTF8.GetString(ea.Body);
				Console.WriteLine($"Simple Consumer2 收到消息: {message},時間{DateTime.Now}");
				Thread.Sleep(3000);
				//確認該消息已被消費
				channel.BasicAck(ea.DeliveryTag, false);
			};
			//啟動消費者 設置為手動應答消息
			channel.BasicConsume("queue_test", false, consumer);
			Console.WriteLine("Simple 2 消費者已啟動");
			Console.ReadKey();
			channel.Dispose();
			connection.Close();
		}

處理消息的結果如圖:

3.4 處理消息的結果

從結果可以看到,消費者1在相同時間內,處理了更多的消息;以上代碼我們實現了公平分發模式;

3.5 注意點

(1)消費者一次接收一條消息,代碼channel.BasicQos(0, 1, false);
(2) 公平分發需要消費者開啟手動應答,關閉自動應答
關閉自動應答代碼channel.BasicConsume("queue_test", false, consumer);
消費者開啟手動應答代碼:channel.BasicAck(ea.DeliveryTag, false);

四、小結

(1)當隊列里消息較多時,我們通常會開啟多個消費者處理消息;公平分發和輪詢分發都是我們經常使用的模式。
(2)輪詢分發的主要思想是“按均分配”,不考慮消費者的處理能力,所有消費者均分;這種情況下,處理能力弱的服務器,一直都在處理消息,而處理能力強的服務器,在處理完消息后,處於空閑狀態;
(3) 公平分發的主要思想是"能者多勞",按需分配,能力強的干的多。
參考文檔: https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM