RabbitMQ (四) 工作隊列之公平分發


上篇文章講的輪詢分發 : 1個隊列,無論多少個消費者,無論消費者處理消息的耗時長短,大家消費的數量都一樣.

而公平分發,又叫 : 能者多勞,顧名思義,處理得越快,消費得越多.

 

生產者

    public class Producer
    {
        private const string QueueName = "test_work2_queue";

        public static void Send()
        {
            //獲取一個連接
            IConnection connection = ConnectionHelper.GetConnection();

            //從連接中獲取一個通道
            IModel channel = connection.CreateModel();

            //聲明隊列
            channel.QueueDeclare(QueueName, false, false, false, null);

            //每次只向消費者發送一條消息,消費者使用后,手動確認后,才會發送另外一條
            channel.BasicQos(0, 1, false);

            for (int i = 0; i < 50; i++)
            {
                string msg = "hello world " + i;

                //發送消息
                channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg));
                Console.WriteLine($"send {msg}");
            }

            channel.Close();
            connection.Close();
        }
    }

 

消費者1

    public class Consumer1
    {
        private const string QueueName = "test_work2_queue";
        public static void Receive()
        {
            //獲取連接
            IConnection connection = ConnectionHelper.GetConnection();

            //創建通道
            IModel channel = connection.CreateModel();

            //聲明隊列
            channel.QueueDeclare(QueueName, false, false, false, null);

     channel.BasicQos(0, 1, false);

            //添加消費者
            EventingBasicConsumer consumer = new EventingBasicConsumer(channel);

            //注冊事件
            consumer.Received += (s, e) =>
            {
                byte[] bytes = e.Body;
                string msg = Encoding.Default.GetString(bytes);
                Console.WriteLine("consumer1 : " + msg);
                Thread.Sleep(2000);//休息2秒
                channel.BasicAck(e.DeliveryTag, false);//手動確認,false表示只確認當前這條消息已收到,ture表示在當前這條消息及之前(小於 DelivertTag )的所有未確認的消息都已收到.
            };

            //監聽隊列,第2個參數設置為手動確認.true 則為自動確認.           
            channel.BasicConsume(QueueName, false, "", false, false, null, consumer);
        }
    }

 

消費者2

                Console.WriteLine("consumer2 : " + msg);
                Thread.Sleep(1000);//休息1秒

 

運行效果:

 

由於 消費者1處理一條消息要2秒,而消費者2只要1秒,所以消費者2處理得多一些.

 

方法解釋:

channel.BasicQos(0, 1, false)

參數1: prefetchSize:0 

參數2: prefetchCount:1 ,告訴RabbitMQ,不要同時給一個消費者推送多於1條消息,即一旦有1個消息還沒有ack(確認),則該消費者將block掉,直到有消息確認

global:true\false 是否將上面設置應用於channel,簡單點說,就是上面限制是channel級別的還是consumer級別

備注:據說prefetchSize 和global這兩項,rabbitmq沒有實現,暫且不研究.

 

channel.BasicAck(e.DeliveryTag, false)

參數1 : deliveryTag : e.DeliveryTag,該消息的標記 ,ulong 類型.
參數2 : multiple:是否批量.true:將一次性確認所有小於 deliveryTag 的消息. 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM