在RabbitMQ(二)AMQP協議mandatory和immediate標志位區別中我們提到,在RabbitMQ3.0以后的版本里,去掉了immediate參數支持,要實現類似的確認功能要使用TTL和DLX。
TTL,Time-To-Live Extensions(過期時間)
RabbitMQ 允許你對 message 和 queue 設置 TTL 值。
Per-Queue Message TTL
通過在 queue.declare 中設置 x-message-ttl 參數,可以控制被 publish 到 queue 中的 message 被丟棄前能夠存活的時間。當某個 message 在 queue 留存的時間超過了配置的 TTL 值時,我們說該 message “已死”。值得注意的是,當一個 message 被路由到多個 queue 中時,其可以在不同的時間“死掉”,或者可能有的不會出現“死掉”情況。在某個 queue 中的某個 message 的“死亡”不會對相同 message 在其他 queue 中的生存狀況。
服務器會保證“死掉”的 message 將不會被包括在任何的 basic.get-ok 或 basic.deliver 方法中。更進一步,服務器將努力在 TTL 到期或到期后的短時間內處理掉該 message 。
參數 x-message-ttl 的值 必須是非負 32 位整數 (0 <= n <= 2^32-1) ,以毫秒為單位表示 TTL 的值。這樣,值 1000 表示存在於 queue 中的當前 message 將最多只存活 1 秒鍾,除非其被投遞到 consumer 上。實參可以是以下 AMQP 類型:short-short-int 、short-int 、long-int 或者 long-long-int 。
下面的 Java 實例代碼創建了一個 queue ,且 message 在該 queue 的存活時間最大為 60 秒: :
- <span style="font-size:14px;">Map<String, Object> args = new HashMap<String, Object>();
- args.put("x-message-ttl", 60000);
- channel.queueDeclare("myqueue", false, false, false, args);</span>
另外也可以用rabbitctl的set_policy來設置:
- <span style="font-size:14px;">rabbitmqctl set_policy TTL ".*" '{"message-ttl":60000}' --apply-to queues</span>
還可以通過HTTP接口調用如下:
- <span style="font-size:14px;">$ curl -i -u guest:guest -H "content-type:application/json" -XPUT
- -d'{"auto_delete":false,"durable":true,"arguments":{"x-message-ttl": 60000}}'
- http://localhost:15672/api/queues/{vhost}/{queuename}</span>
當 message 被 requeue 的時候,其原始過期時間將被保留(例如由於設置了 requeue 參數的 AMQP 方法的使用,或者由於 channel 的關閉)。
設置 x-message-ttl 為 0 將使得,在 message 在到達 queue 之后但尚未被立即投遞到 consumer 的情況下,判定為過期。這種方式相當於 RabbitMQ server 在3.0.0以后不支持 basic.publish 中 immediate 標識情況下的等價實現。與 immediate 標幟方式不同的是,將不會有 basic.returns 命令的調用,但是在設置了 dead letter exchange 的情況下,這些 message 將被處理為 dead-lettered(詳見下面的DLX) 。
Per-Message TTL
在RabbitMQ 3.0.0以后的版本中,TTL 設置可以具體到每一條 message 本身,只要在通過 basic.publish 命令發送 message 時設置 expiration 字段。
expiration 字段以微秒為單位表示 TTL 值。且與 x-message-ttl 具有相同的約束條件。因為 expiration 字段必須為字符串類型,broker 將只會接受以字符串形式表達的數字。
當同時指定了 queue 和 message 的 TTL 值,則兩者中較小的那個才會起作用。
下面的 Java 示例 publish 了最多能在 queue 中存活 60 秒的 message :
- <span style="font-size:14px;">byte[] messageBodyBytes = "Hello, world!".getBytes();
- AMQP.BasicProperties properties = new AMQP.BasicProperties();
- properties.setExpiration("60000");
- channel.basicPublish("myexchange", "routingkey", properties, messageBodyBytes);</span>
還可以通過HTTP接口調用如下:
- <span style="font-size:14px;">$ curl -i -u guest:guest -H "content-type:application/json" -XPOST
- -d'{"properties":{"expiration":"60000"},"routing_key":"routingkey","payload":"my
- body","payload_encoding":"string"}'
- http://localhost:15672/api/exchanges/{vhost}/{exchangename}/publish</span>
雖然 consumer 從來看不到過期的 message ,但是在過期 message 到達 queue 的頭部時確實會被真正的丟棄(或者 dead-lettered )。當對每一個 queue 設置了 TTL 值時不會產生任何問題,因為過期的 message 總是會出現在 queue 的頭部。當對每一條 message 設置了 TTL 時,過期的 message 可能會排隊於未過期 message 的后面,直到這些消息被 consume 到或者過期了。在這種情況下,這些過期的 message 使用的資源將不會被釋放,且會在 queue 統計信息中被計算進去(例如,queue 中存在的 message 的數量)。
對於第一種設置隊列TTL屬性的方法,一旦消息過期,就會從隊列中抹去,而第二種方法里,即使消息過期,也不會馬上從隊列中抹去,因為每條消息是否過期時在即將投遞到消費者之前判定的,為什么兩者得處理方法不一致?因為第一種方法里,隊列中已過期的消息肯定在隊列頭部,RabbitMQ只要定期從隊頭開始掃描是否有過期消息即可,而第二種方法里,每條消息的過期時間不同,如果要刪除所有過期消息,勢必要掃描整個隊列,所以不如等到此消息即將被消費時再判定是否過期,如果過期,再進行刪除。
Queue TTL
queue.declare 命令中的 x-expires 參數控制 queue 被自動刪除前可以處於未使用狀態的時間。未使用的意思是 queue 上沒有任何 consumer ,queue 沒有被重新聲明,並且在過期時間段內未調用過 basic.get 命令。該方式可用於,例如,RPC-style 的回復 queue ,其中許多 queue 會被創建出來,但是卻從未被使用。
服務器會確保在過期時間到達后 queue 被刪除,但是不保證刪除的動作有多么的及時。在服務器重啟后,持久化的 queue 的超時時間將重新計算。
用於表示超期時間的 x-expires 參數值以微秒為單位,並且服從和 x-message-ttl 一樣的約束條件,且不能設置為 0 。所以,如果該參數設置為 1000 ,則表示該 queue 如果在 1 秒鍾之內未被使用則會被刪除。
下面的 Java 示例創建了一個 queue ,其會在 30 分鍾不使用的情況下判定為超時。
- Map<String, Object> args = new HashMap<String, Object>();
- args.put("x-expires", 1800000);
- channel.queueDeclare("myqueue", false, false, false, args);
