參考資料:https://www.rabbitmq.com/maxlength.html
RabbitMQ 有兩種方式限制隊列長度,第一種是對隊列中消息總數進行限制:
gordon.study.rabbitmq.features.TestQueueLengthLimit.java
public class TestQueueLengthLimit {
private static final String QUEUE_NAME = "queueLengthLimit";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel senderChannel = connection.createChannel();
Channel consumerChannel = connection.createChannel();
// 設置隊列最大消息數量為5
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 5);
senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
// 發布6個消息
for (int i = 0; i < 6;) {
String message = "NO. " + ++i;
senderChannel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
}
// 獲取的消息為 NO. 2,說明隊列頭部第一條消息被拋棄
Thread.sleep(100);
GetResponse resp = consumerChannel.basicGet(QUEUE_NAME, false);
String message = new String(resp.getBody(), "UTF-8");
System.out.printf("consume: %s\n", message);
System.out.printf("queue size: %d\n", resp.getMessageCount());
// 現在隊列中有4個 Ready消息,1個 Unacked消息。此時再發布兩條消息,應該只有 NO. 3 被拋棄。
senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 7".getBytes("UTF-8"));
senderChannel.basicPublish("", QUEUE_NAME, null, "NO. 8".getBytes("UTF-8"));
Thread.sleep(100);
GetResponse resp2 = consumerChannel.basicGet(QUEUE_NAME, false);
message = new String(resp2.getBody(), "UTF-8");
System.out.printf("consume: %s\n\n", message);
// 現在隊列中有4個 Ready消息,2個 Unacked消息。
// 此時Nack,消息2、4取消退回隊列頭導致隊列消息數量超過設定值,誰能留下?
consumerChannel.basicNack(resp2.getEnvelope().getDeliveryTag(), true, true);
Thread.sleep(100);
while (true) {
resp = consumerChannel.basicGet(QUEUE_NAME, true);
if (resp == null) {
break;
} else {
message = new String(resp.getBody(), "UTF-8");
System.out.printf("consume: %s\n", message);
}
}
}
}
顧名思義,x-max-length 參數限制了一個隊列的消息總數,當消息總數達到限定值時,隊列頭的消息會被拋棄。
此外,處於 Unacked 狀態的消息不納入消息總數計算。但是,當 Unacked 消息被 reject 並重新入隊時,就會受 x-max-length 參數限制,可能回不了隊列。
第二種是對隊列中消息體總字節數進行限制:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length-bytes ", 1000);
senderChannel.queueDeclare(QUEUE_NAME, false, false, true, args);
只計算消息體的字節數,不算消息頭、消息屬性等字節數。
