上篇文章講的輪詢分發 : 1個隊列,無論多少個消費者,無論消費者處理消息的耗時長短,大家消費的數量都一樣.
而公平分發,又叫 : 能者多勞,顧名思義,處理得越快,消費得越多.
生產者
public class Producer { private const string QueueName = "test_work2_queue"; public static void Send() { //獲取一個連接 IConnection connection = ConnectionHelper.GetConnection(); //從連接中獲取一個通道 IModel channel = connection.CreateModel(); //聲明隊列 channel.QueueDeclare(QueueName, false, false, false, null); //每次只向消費者發送一條消息,消費者使用后,手動確認后,才會發送另外一條 channel.BasicQos(0, 1, false); for (int i = 0; i < 50; i++) { string msg = "hello world " + i; //發送消息 channel.BasicPublish("", QueueName, null, Encoding.Default.GetBytes(msg)); Console.WriteLine($"send {msg}"); } channel.Close(); connection.Close(); } }
消費者1
public class Consumer1 { private const string QueueName = "test_work2_queue"; public static void Receive() { //獲取連接 IConnection connection = ConnectionHelper.GetConnection(); //創建通道 IModel channel = connection.CreateModel(); //聲明隊列 channel.QueueDeclare(QueueName, false, false, false, null); channel.BasicQos(0, 1, false); //添加消費者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //注冊事件 consumer.Received += (s, e) => { byte[] bytes = e.Body; string msg = Encoding.Default.GetString(bytes); Console.WriteLine("consumer1 : " + msg); Thread.Sleep(2000);//休息2秒 channel.BasicAck(e.DeliveryTag, false);//手動確認,false表示只確認當前這條消息已收到,ture表示在當前這條消息及之前(小於 DelivertTag )的所有未確認的消息都已收到. }; //監聽隊列,第2個參數設置為手動確認.true 則為自動確認. channel.BasicConsume(QueueName, false, "", false, false, null, consumer); } }
消費者2
Console.WriteLine("consumer2 : " + msg); Thread.Sleep(1000);//休息1秒
運行效果:

由於 消費者1處理一條消息要2秒,而消費者2只要1秒,所以消費者2處理得多一些.
方法解釋:
channel.BasicQos(0, 1, false)
參數1: prefetchSize:0
參數2: prefetchCount:1 ,告訴RabbitMQ,不要同時給一個消費者推送多於1條消息,即一旦有1個消息還沒有ack(確認),則該消費者將block掉,直到有消息確認
global:true\false 是否將上面設置應用於channel,簡單點說,就是上面限制是channel級別的還是consumer級別
備注:據說prefetchSize 和global這兩項,rabbitmq沒有實現,暫且不研究.
channel.BasicAck(e.DeliveryTag, false)
參數1 : deliveryTag : e.DeliveryTag,該消息的標記 ,ulong 類型.
參數2 : multiple:是否批量.true:將一次性確認所有小於 deliveryTag 的消息.
