TTL:Time To Live的簡稱,即過期時間。RabbitMQ可以對消息和隊列設置TTL。
設置消息的TTL
目前有兩種方法設置消息的TTL,第一種方法是通過隊列的屬性設置,隊列中的所有消息都有相同的過期時間。第二種方法是對消息本身進行單獨設置,每條消息的TTL可以不同。如果兩種方法一起使用那么消息的TTL以兩者之間較小的那個數值為准。消息在隊列中的生存時間一旦超過設置的TTL值時,就會變成“死信”(Dead Message),消費者將無法再收到該消息(這點不是絕對的)。
通過隊列的屬性設置
通過隊列屬性設置消息的TTL方法是在channel.queueDeclare方法中加入x-message-ttl參數實現的,這個參數的單位是毫秒(ms)。示例部分代碼如下:
Map<String,Object> arg = new HashMap<String, Object>(); arg.put("x-message-ttl",6000); channel.queueDeclare("normalQueue",true,false,false,arg);
同時也可以通過Policy的方式來設置TTL,示例如下:
rabbitmqctl set_policy TTL ".*" '{''message-ttl":6000}' --apply-to queues
如果不設置TTL,則表示此消息不會過期;如果設置為0,則表示除非此時可以直接將消息投遞給消費者,否則該消息會被立即丟棄。
每條消息設置TTL
針對每條消息設置TTL的方法是在channel.basicPublish方法中加入expiration的屬性參數,單位為毫秒。關鍵代碼如下:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.deliveryMode(2);//持久化消息 builder.expiration("6000");//設置過期時間 AMQP.BasicProperties properties = builder.build(); channel.basicPublish("exchange","routingkey",true,properties,"hello".getBytes());
對於通過隊列設置的TTL屬性的方法,一旦消息過期,就會從隊列中抹去,而對於每條消息的設置TTL屬性,即使消息過期,也不會馬上從隊列中抹去,因為每條消息是否過期是在即將投遞到消費者之前判斷的。
為什么這兩種方式的處理不一樣呢?
因為在通過隊列設置的TTL屬性的方法里,對於已過期的消息肯定是在隊列的頭部,RabbitMQ只是需要定期從隊列頭開始掃描是否有過期消息即可。而在消息設置TTL的方法里,每條消息的過期時間不同,如果要刪除所有的過期消息勢必要掃描整個隊列,所以不如等到此消息即將被消費時再判斷是否過期,如果過期再進行刪除即可。
設置隊列的TTL
通過channel.queueDeclare方法中的x-expires參數可以控制隊列被自動刪除前處於未使用狀態的時間。未使用的意思是隊列上沒有任何的消費者,隊列也沒有被重新聲明,並且在過期時間段內也未調用過Basic.Get命令。
RabbitMQ會確保在過期時間到達后將隊列刪除,但是不保障刪除的動作有多及時。在RabbitMQ重啟后,持久化的隊列的過期時間會被重新計算。
用於表示過期時間的x-expires參數以毫秒為單位,並且服從和x-message-ttl一樣的約束條件,不過不能設置為0,比如該參數設置為1000,則表示該隊列如果在1秒內未使用則會被刪除。下面舉例一個創建過期時間為30分鍾的隊列:
Map<String,Object> ars = new HashMap<String, Object>(); ars.put("x-expires",1800000); channel.queueDeclare("myqueue",false,false,false,ars);
死信隊列
DLX,全稱為Dead-Letter-Exchange,可以稱之為死信交換器,也有人稱為死信郵箱。當消息在一個隊列中變為死信之后,它能被重新被發送到另一個交換器中,這個交換器就是DLX,綁定DLX的隊列就稱之為死信隊列。
消息變成死信一般是由以下幾種情況造成的:
❤ 消息被拒絕,並且設置requeue參數為false
❤ 消息過期
❤ 隊列達到最大長度
DLX也是一個正常的交換器,和一般的交換器沒有區別,它能在任何的隊列上被指定,實際上就是設置某個隊列的屬性。當這個隊列存在死信時,RabbitMQ就會自動的將這個消息重新發布到設置的DLX上去,進而被路由到另一個隊列,即死信隊列。可以監聽這個隊列中的消息以進行相應的處理。
方法:
通過在channel.queueDeclare方法中設置x-dead-letter-exchange參數來為這個隊列添加DLX。示例代碼如下:
channel.exchangeDeclare("dlx_exchange","direct"); Map<String,Object> ars = new HashMap<String, Object>(); ars.put("x-dead-letter-exchange","dlx_exchange"); //可以為這個DLX指定路由鍵,如果沒有特殊指定,則使用原隊列的路由鍵 ars.put("x-dead-letter-routing-key","dlx-routing-key"); //為隊列添加DLX channel.queueDeclare("myqueue",false,false,false,ars);
通過Policy的方式設置:
rabbitmqctl set_policy DLX ".*" '{"dead-letter-change":"dlx_exchange"}' --apply-to queues
下面舉個例子,為其設置TTL和DLX,如下:
channel.exchangeDeclare("exchange.dlx","direct",true); channel.exchangeDeclare("exchange.normal","fanout",true); Map<String,Object> ars = new HashMap<String, Object>(); ars.put("x-dead-letter-exchange","exchange.dlx"); //可以為這個DLX指定路由鍵,如果沒有特殊指定,則使用原隊列的路由鍵 ars.put("x-dead-letter-routing-key","dlx-routing-key"); ars.put("x-message-ttl",10000); //為隊列添加DLX channel.queueDeclare("queue.normal",true,false,false,ars); channel.queueBind("queue.normal","exchange.normal",""); channel.queueDeclare("queue.dlx",true,false,false,null); channel.queueBind("queue.dlx","exchange.dlx","dlx-routing-key"); channel.basicPublish("exchange.normal","rk",MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx".getBytes());
上述例子分別創建了兩個交換器exchange.normal和exchange.dlx,分別綁定了兩個隊列queue.normal和queue.dlx。
上述代碼執行過程:
生產者首先發送一條攜帶路由鍵為“rk”的消息,然后經過交換器exchange.normal順利的存儲到隊列queue.normal中,由於隊列queue.normal設置了過期時間為10s,在這10s內沒有消費者消費這條消息,那么判定這條消息過期。由於設置了DLX,過期之時,消息會被丟給交換器excange.dlx中,這時找到與exchange.dlx匹配的隊列queue.dlx,最后消息被存儲在queue.dlx這個死信隊列中。
對於RabbitMQ來說,DLX是一個非常有用的特性。它可以處理異常的情況下,消息不能夠正確的被消費者消費(消費者調用了Basix.Nack或者Basic.Reject)而被置於死信隊列中的情況,后續分析過程可以通過消費這個死信隊列中的內容來分析當時所遇到的異常情況,進而改善和優化系統。
延遲隊列
延遲隊列所存儲的對象是對應的延遲消息,所謂“延遲消息”是指當消息發送后,並不想讓消費者立即拿到消息,而是等待特定時間后,消費者才能拿到這個消息進行消費。
在AMQP協議中,或者RabbitMQ本身沒有直接支持延遲隊列的功能,但是可以通過DLX和TTL模擬出延遲隊列的功能。
其實在文章的上半段中的最后一個例子就是延遲隊列的用法:對於queue.dlx這個死信隊列來講。假設一個應用中需要將每條消息設置為10秒的延遲,生產者通過exchange.normal這個交換器將發送的消息存儲在queue.normal這個隊列中。消費者訂閱的並非是queue.normal這個隊列,而是queue.dlx這個隊列。當消息從queue.normal這個消息隊列中過期之后被存入queue.dlx這個隊列中,消費者就恰好延遲10秒接收到這個消息。
在真實的應用中,對於延遲隊列可以根據延遲時間的長短分為多個等級,一般為5秒,10秒,30秒,1分鍾,5分鍾,10分鍾,30分鍾,一個小時這幾個維度。
參考:《RabbitMQ實戰指南》 朱忠華 編著;