在RabbitMQ中,可以設置消息的優先級,也就相當於在隊列中置頂某條消息,讓某個消息優先得到處理的功能。
既然是設置消息的優先級,那么就是針對生產者,也就是消息發布端。
設置消息的優先級一共有2個步驟:
1、設置隊列的x-max-priority參數;
2、設置消息的Priority參數。
話不多說,上代碼!
發送端:
var factory = new ConnectionFactory() { HostName = "localhost",UserName="ty2017",Password="123456",VirtualHost="log" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //聲明一個隊列,設置x-max-priority參數 channel.QueueDeclare("q.test", true, false, false, new Dictionary<string, object> { { "x-max-priority", 30 } }); for (int i = 0; i < 6; i++) { var body = Encoding.UTF8.GetBytes(string.Format("第{0}個消息",i+1)); var properties = channel.CreateBasicProperties(); properties.SetPersistent(true); //設置消息的優先級 properties.Priority = (byte)((i == 3)?30:i); //發布消息 channel.BasicPublish(exchange: "", routingKey: "q.test", basicProperties: properties, body: body); } } }
將第四個消息的優先級設置為最大,我們打開Consumer端,看看第四個消息是否被優先處理!
Consumer端代碼如下:
var factory = new ConnectionFactory() { HostName = "localhost", UserName = "ty2017", Password = "123456",VirtualHost="log" }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare("q.test", true, false, false, new Dictionary<string, object> { { "x-max-priority", 30 } }); channel.BasicQos(0, 1, false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine("優先級:{0} 消息內容:{1}",ea.BasicProperties.Priority, message); channel.BasicAck(ea.DeliveryTag, false); }; channel.BasicConsume(queue: "q.test", noAck: false, consumer: consumer); Console.ReadLine(); } }
運行Consumer端程序,得到如下結果:
根據結果可以看出,優先級越大的消息越是被優先處理!