簡介
本節主要討論隊列聲明的各個參數
queueDeclare(String queue, boolean durable, boolean exclusive, Map<String, Object> arguments);
-
queue: 隊列名稱
-
durable: 是否持久化, 隊列的聲明默認是存放到內存中的,如果rabbitmq重啟會丟失,如果想重啟之后還存在就要使隊列持久化,保存到Erlang自帶的Mnesia數據庫中,當rabbitmq重啟之后會讀取該數據庫
-
exclusive:是否排外的,有兩個作用,一:當連接關閉時connection.close()該隊列是否會自動刪除;二:該隊列是否是私有的private,如果不是排外的,可以使用兩個消費者都訪問同一個隊列,沒有任何問題,如果是排外的,會對當前隊列加鎖,其他通道channel是不能訪問的,如果強制訪問會報異常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method:
#method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)
一般等於true的話用於一個隊列只能有一個消費者來消費的場景 -
autoDelete:是否自動刪除,當最后一個消費者斷開連接之后隊列是否自動被刪除,可以通過RabbitMQ Management,查看某個隊列的消費者數量,當consumers = 0時隊列就會自動刪除
-
arguments:
隊列中的消息什么時候會自動被刪除?-
Message TTL(x-message-ttl):設置隊列中的所有消息的生存周期(統一為整個隊列的所有消息設置生命周期), 也可以在發布消息的時候單獨為某個消息指定剩余生存時間,單位毫秒, 類似於redis中的ttl,生存時間到了,消息會被從隊里中刪除,注意是消息被刪除,而不是隊列被刪除, 特性Features=TTL, 單獨為某條消息設置過期時間AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().expiration(“6000”);
channel.basicPublish(EXCHANGE_NAME, “”, properties.build(), message.getBytes(“UTF-8”)); -
Auto Expire(x-expires): 當隊列在指定的時間沒有被訪問(consume, basicGet, queueDeclare…)就會被刪除,Features=Exp
-
Max Length(x-max-length): 限定隊列的消息的最大值長度,超過指定長度將會把最早的幾條刪除掉, 類似於mongodb中的固定集合,例如保存最新的100條消息, Feature=Lim
-
Max Length Bytes(x-max-length-bytes): 限定隊列最大占用的空間大小, 一般受限於內存、磁盤的大小, Features=Lim B
-
Dead letter exchange(x-dead-letter-exchange): 當隊列消息長度大於最大長度、或者過期的等,將從隊列中刪除的消息推送到指定的交換機中去而不是丟棄掉,Features=DLX
-
Dead letter routing key(x-dead-letter-routing-key):將刪除的消息推送到指定交換機的指定路由鍵的隊列中去, Feature=DLK
-
Maximum priority(x-max-priority):優先級隊列,聲明隊列時先定義最大優先級值(定義最大值一般不要太大),在發布消息的時候指定該消息的優先級, 優先級更高(數值更大的)的消息先被消費,
- Lazy mode(x-queue-mode=lazy): Lazy Queues: 先將消息保存到磁盤上,不放在內存中,當消費者開始消費的時候才加載到內存中
- Master locator(x-queue-master-locator)
-
注意
關於隊列的聲明,如果使用同一套參數進行聲明了,就不能再使用其他參數來聲明,要么刪除該隊列重新刪除,可以使用命令行刪除也可以在RabbitMQ Management上刪除,要么給隊列重新起一個名字。
隊列持久化
重啟RabbitMQ服務器(可以通過rabbitmqctl stop_app關閉服務器,rabbitmqctl start_app重啟服務器),可以登錄RabbitMQ Management—> Queues中可以看到之前聲明的隊列還存在
boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, arguments);
消息持久化
設置消息持久化必須先設置隊列持久化,要不然隊列不持久化,消息持久化,隊列都不存在了,消息存在還有什么意義。消息持久化需要將交換機持久化、隊列持久化、消息持久化,才能最終達到持久化的目的
方式一:設置deliveryMode=2
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); String message = "Hello RabbitMQ: "; // 設置消息持久化 AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder(); properties.deliveryMode(2); // 設置消息是否持久化,1: 非持久化 2:持久化 channel.basicPublish(EXCHANGE_NAME, "", properties.build(), message.getBytes("UTF-8"));
方式二:設置BasicProperties為MessageProperties.PERSISTENT_TEXT_PLAIN
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); String message = "Hello RabbitMQ: "; channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
Message TTL消息剩余生存時間
統一設置隊列中的所有消息的過期時間,例如設置10秒,10秒后這個隊列的消息清零
方式一:為該隊列的所有消息統一設置相同的聲明周期
Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-message-ttl", 10000); // 聲明隊列時指定隊列中的消息過期時間 channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
方式二:單獨為某條消息單獨設置時間
// expiration: 設置單條消息的過期時間
channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); for(int i = 1; i <= 5; i++) { AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties() .builder().expiration( i * 1000 + ""); channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8")); }
Auto Expire自動過期
x-expires用於當多長時間沒有消費者訪問該隊列的時候,該隊列會自動刪除,可以設置一個延遲時間,如僅啟動一個生產者,10秒之后該隊列會刪除,或者啟動一個生產者,再啟動一個消費者,消費者運行結束后10秒,隊列也會被刪除
Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-expires", 10000); channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
Max Length最大長度
x-max-length:用於指定隊列的長度,如果不指定,可以認為是無限長,例如指定隊列的長度是4,當超過4條消息,前面的消息將被刪除,給后面的消息騰位
Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-max-length", 4); channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); for(int i = 1; i <= 5; i++) { channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, (message + i).getBytes("UTF-8")); }
Max Length Bytes代碼片段
x-max-length-bytes: 用於指定隊列存儲消息的占用空間大小,當達到最大值是會刪除之前的數據騰出空間
Map<String, Object> arguments = new HashMap<String, Object>(); rguments.put("x-max-length-bytes", 1024); channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
Maximum priority最大優先級
x-max-priority: 設置消息的優先級,優先級值越大,越被提前消費。
正常情況下不適用優先級
Hello RabbitMQ: 1
Hello RabbitMQ: 2
Hello RabbitMQ: 3
Hello RabbitMQ: 4
Hello RabbitMQ: 5
使用優先級順序正好相反
Hello RabbitMQ: 5
Hello RabbitMQ: 4
Hello RabbitMQ: 3
Hello RabbitMQ: 2
Hello RabbitMQ: 1
Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); for(int i = 1; i <= 5; i++) { AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties() .builder().priority(i); channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8")); }
Dead letter exchange(死亡交換機) 和 Dead letter routing key(死亡路由鍵)
當隊列中的消息過期,或者達到最大長度而被刪除,或者達到最大空間時而被刪除時,可以將這些被刪除的信息推送到其他交換機中,讓其他消費者訂閱這些被刪除的消息,處理這些消息
public void testBasicPublish() throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 聲明一個接收被刪除的消息的交換機和隊列 String EXCHANGE_DEAD_NAME = "exchange.dead"; String QUEUE_DEAD_NAME = "queue_dead"; channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null); channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead"); String EXCHANGE_NAME = "exchange.fanout"; String QUEUE_NAME = "queue_name"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-message-ttl", 15000); arguments.put("x-max-length", 4); arguments.put("x-max-length-bytes", 1024); arguments.put("x-expires", 30000); arguments.put("x-dead-letter-exchange", "exchange.dead"); arguments.put("x-dead-letter-routing-key", "routingkey.dead"); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); String message = "Hello RabbitMQ: "; for(int i = 1; i <= 5; i++) { channel.basicPublish(EXCHANGE_NAME, "", null, (message + i).getBytes("UTF-8")); } channel.close(); connection.close(); }
剛開始由於隊列長度是4,總共發送5條消息,所以最早進入隊列的消息1將被刪除掉,被推送到“死亡隊列”中,所以看到普通隊列的消息為4條,死亡隊列的消息為1條,是消息1
隨着時間的流逝,普通隊列中的消息都該過期了,所以消息2、3、4、5都被推送到死亡隊列,所以死亡隊列消息是5條,普通隊列的消息條數為0
再隨着時間的流逝,普通隊列過了指定時間沒有被消費者訪問,這個隊列自動被刪除了,所以看不到普通隊列了,只有死亡隊列
查看死亡隊列的消息可以得知,消息一死亡的原因是maxlen達到了最大長度,消息2、3、4、5都是因為生存時間到了導致死亡的
一個比較雜的綜合示例
關於消費者就不用代碼來獲取消息了,直接在RabbitMQ Management點擊某個隊列的名字,然后Get Message(s) 即可獲取
該示例使用很多參數配置,可能實際使用不會像這樣用,因為這樣好像不太配套。
public class Producer { @Test public void testBasicPublish() throws IOException, TimeoutException, InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("127.0.0.1"); factory.setPort(AMQP.PROTOCOL.PORT); factory.setUsername("mengday"); factory.setPassword("mengday"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 聲明一個接收被刪除的消息的交換機和隊列 String EXCHANGE_DEAD_NAME = "exchange.dead"; String QUEUE_DEAD_NAME = "queue_dead"; channel.exchangeDeclare(EXCHANGE_DEAD_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare(QUEUE_DEAD_NAME, false, false, false, null); channel.queueBind(QUEUE_DEAD_NAME, EXCHANGE_DEAD_NAME, "routingkey.dead"); String EXCHANGE_NAME = "exchange.fanout"; String QUEUE_NAME = "queue_name"; channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); Map<String, Object> arguments = new HashMap<String, Object>(); // 統一設置隊列中的所有消息的過期時間 arguments.put("x-message-ttl", 30000); // 設置超過多少毫秒沒有消費者來訪問隊列,就刪除隊列的時間 arguments.put("x-expires", 20000); // 設置隊列的最新的N條消息,如果超過N條,前面的消息將從隊列中移除掉 arguments.put("x-max-length", 4); // 設置隊列的內容的最大空間,超過該閾值就刪除之前的消息 arguments.put("x-max-length-bytes", 1024); // 將刪除的消息推送到指定的交換機,一般x-dead-letter-exchange和x-dead-letter-routing-key需要同時設置 arguments.put("x-dead-letter-exchange", "exchange.dead"); // 將刪除的消息推送到指定的交換機對應的路由鍵 arguments.put("x-dead-letter-routing-key", "routingkey.dead"); // 設置消息的優先級,優先級大的優先被消費 arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, false, false, false, arguments); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); String message = "Hello RabbitMQ: "; for(int i = 1; i <= 5; i++) { // expiration: 設置單條消息的過期時間 AMQP.BasicProperties.Builder properties = new AMQP.BasicProperties().builder().priority(i).expiration( i * 1000 + ""); channel.basicPublish(EXCHANGE_NAME, "", properties.build(), (message + i).getBytes("UTF-8")); } channel.close(); connection.close(); } }
運行效果
疑惑: 單獨使用arguments.put(“x-max-length”, 4); arguments.put(“x-dead-letter-exchange”, “exchange.dead”);arguments.put(“x-dead-letter-routing-key”, “routingkey.dead”);發現消息1先會觸發maxlen條件,而被推送到queue_dead隊列中,由此可以得出,當達到最大長度時,先刪除的是先被添加到隊列的消息。但是如果很多條件一塊同時使用可能現象不太好解釋,如上例如,實際結果是消息5因為maxlen而被推送到死亡隊列中,消息1、2、3、4都是由於expired過期導致的,難道不是消息1由於maxlen被推送到希望隊列,而2、3、4、5是由於過期導致的嗎?還有上面代碼如果將x-max-len該為3,在死亡隊列中獲取消息的先后順序是4、5、3、2、1不是優先級高的先被消費嗎,為啥不是5、4、3、2、1 難道是條件用的太多了,都亂了??? 明白的同學請留言,謝謝!
經過實際測試,參數單獨用或者和其他參數合理搭配使用都沒問題,如果是像上例一塊都用,大雜燴,搞不懂結果。
// 檢查隊列是否存在,不存在拋異常
channel.queueDeclarePassive(“queue_name”);