1.RabbitMQ TTL及死信隊列
1.1.TTL概述
過期時間TTL表示可以對消息設置預期的時間,在這個時間內都可以被消費者接收獲取;過了之后消息將自動被刪除。RabbitMQ可以對消息和隊列設置TTL。目前有兩種方法可以設置。
-
第一種方法是通過隊列屬性設置,隊列中所有消息都有相同的過期時間。
-
第二種方法是對消息進行單獨設置,每條消息TTL可以不同。
注意:
如果上述兩種方法同時使用,則消息的過期時間以兩者之間TTL較小的那個數值為准。消息在隊列的生存時間一旦超過設置的TTL值,就稱為dead message被投遞到死信隊列, 消費者將無法再收到該消息。
界面具體設置如下圖所示:
1.2.TTL簡單實現
①基於隊列屬性進行設置:
這里在springBoot-order-rabbitmq-producer項目中config目錄新建一個TTLRabbitMqConfiguration,聲明ttl交換機與ttlQueue,代碼如下:
@Configuration public class TTLRabbitMqConfiguration { //聲明交換機,不同的交換機類型不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange @Bean public DirectExchange ttldirectOrderExchange() { return new DirectExchange("ttl_direct_exchange", true, false); } //定義隊列的過期時間 @Bean public Queue directttlQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); //這里一定是int類型 return new Queue("ttl.direct.queue", true, false, false, args); } @Bean public Binding ttlBingding(){ return BindingBuilder.bind(directttlQueue()).to(ttldirectOrderExchange()).with("ttl"); } }
在OrderService中進行消息發送至消息隊列:
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定義交換機 private String exchangeName = ""; // 2: 路由key private String routeKey = ""; //ttl--死信隊列 public void makeOrderTTLQueue(Long userId, Long productId, int num) { exchangeName = "ttl_direct_exchange"; routeKey = "ttl"; // 1: 模擬用戶下單 String orderNumer = UUID.randomUUID().toString(); // 2: 根據商品id productId 去查詢商品的庫存 // int numstore = productSerivce.getProductNum(productId); // 3:判斷庫存是否充足 // if(num > numstore ){ return "商品庫存不足..."; } // 4: 下單邏輯 // orderService.saveOrder(order); // 5: 下單成功要扣減庫存 // 6: 下單完成以后 System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer); // 發送訂單信息給RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); } }
進行測試:
@SpringBootTest class RabbitmqApplicationTests { @Autowired private OrderService orderService; @Test void ttlQueueTest() throws InterruptedException { for (int i = 0; i < 5; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 1; orderService.makeOrderTTLQueue(userId, productId, num); } } }
可以看到消息向隊列中發送,但是5s之后消息會自動從隊列中移除,這就是TTL消息過期移除。
②基於某個消息發送時單獨設置過期時間:
這種方式不需要在隊列與交換機綁定時設置Queue過期屬性,只需要聲明為普通隊列即可。
@Configuration public class TTLRabbitMqConfiguration { //聲明交換機,不同的交換機類型不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange @Bean public DirectExchange ttldirectOrderExchange() { return new DirectExchange("ttl_direct_exchange", true, false); } //定義隊列的過期時間 --定義一個普通隊列,在外面設置過期時間 @Bean public Queue directttlMessageQueue() { return new Queue("ttl.message.direct.queue", true, false, false); } @Bean public Binding ttlMsgBingding(){ return BindingBuilder.bind(directttlMessageQueue()).to(ttldirectOrderExchange()).with("ttlmsg"); } }
在發送時進行單獨消息過期時間屬性設置:
@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // 1: 定義交換機 private String exchangeName = ""; // 2: 路由key private String routeKey = ""; //ttl--死信隊列--普通隊列設置 public void makeOrderTTLMsgQueue(Long userId, Long productId, int num) { exchangeName = "ttl_direct_exchange"; routeKey = "ttlmsg"; String orderNumer = UUID.randomUUID().toString(); System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer); //給消息設置過期時間 MessagePostProcessor postProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); //時間為5s message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; // 發送訂單信息給RabbitMQ fanout,指定消息的擴展信息 rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer, postProcessor); } }
進行測試:
@SpringBootTest class RabbitmqApplicationTests { @Autowired private OrderService orderService; @Test void ttlMsgQueueTest() throws InterruptedException { for (int i = 0; i < 3; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 1; orderService.makeOrderTTLMsgQueue(userId, productId, num); } } }
可以看到普通消息也可以通過設置過期時間,實現在消息隊列中進行過期移除的功能。
①與②的主要區別在於:
通過隊列設置ttl過期可以與死信隊列進行綁定,后期過期之后可以加入死信隊列;而通過單獨普通消息后期設置屬性無法加入到死信隊列中,即沒有備胎。
1.3.死信隊列
DLX,全稱為Dead-Letter-Exchange , 可以稱之為死信交換機,也有人稱之為死信郵箱。當消息在一個隊列中變成死信(dead message)之后,它能被重新發送到另一個交換機中,這個交換機就是DLX ,綁定DLX的隊列就稱之為死信隊列。 消息變成死信,可能是由於以下的原因:
-
消息被拒絕
-
消息過期
-
隊列達到最大長度
DLX也是一個正常的交換機,和一般的交換機沒有區別,它能在任何的隊列上被指定,實際上就是設置某一個隊列的屬性。當這個隊列中存在死信時,Rabbitmq就會自動地將這個消息重新發布到設置的DLX上去,進而被路由到另一個隊列,即死信隊列。
死信隊列的執行流程:
1.4.死信隊列簡單實現
①在config目錄下創建TTLRabbitMqConfiguration,聲明ttl交換機及隊列綁定關系,同時聲明死信隊列:
這里最主要的就是按照界面參數設置了死信隊列exchange及routekey:
@Configuration public class TTLRabbitMqConfiguration { //聲明交換機,不同的交換機類型不同:DirectExchange/FanoutExchange/TopicExchange/HeadersExchange @Bean public DirectExchange ttldirectOrderExchange() { return new DirectExchange("ttl_direct_exchange", true, false); } //定義隊列的過期時間 //定義隊列的死信隊列 //死信隊列的route key @Bean public Queue directttlQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-message-ttl", 5000); //這里一定是int類型 args.put("x-dead-letter-exchange", "dead_direct_exchange"); //這里與定義好的死信交換機進行綁定,死信交換機會去找死信隊列 args.put("x-dead-letter-routing-key", "dead"); //如果是fanout模式這里不需要route key args.put("x-max-length", 5); //設置每次給死信隊列中發送消息的長度 return new Queue("ttl.direct.queue", true, false, false, args); } @Bean public Binding ttlBingding(){ return BindingBuilder.bind(directttlQueue()).to(ttldirectOrderExchange()).with("ttl"); } }
②業務層調用及測試:
//ttl--死信隊列 public void makeOrderTTLQueue(Long userId, Long productId, int num) { exchangeName = "ttl_direct_exchange"; routeKey = "ttl"; String orderNumer = UUID.randomUUID().toString(); System.out.println("用戶 " + userId + ",訂單編號是:" + orderNumer); // 發送訂單信息給RabbitMQ fanout rabbitTemplate.convertAndSend(exchangeName, routeKey, orderNumer); }
測試:
@Test void ttlQueueTest() throws InterruptedException { for (int i = 0; i < 5; i++) { Thread.sleep(1000); Long userId = 100L + i; Long productId = 10001L + i; int num = 1; orderService.makeOrderTTLQueue(userId, productId, num); } }
可以看到Queue屬性中TTL、Lim相關的設置,5s過期后都加入到了死信隊列中:
2.RabbitMQ內存管控
2.1.RibbitMQ持久化
持久化就把信息寫入到磁盤的過程。
2.2.RabbitMQ內存磁盤監控
RabbitMQ的內存警告
當內存使用超過配置的閾值或者磁盤空間剩余空間對於配置的閾值時,RabbitMQ會暫時阻塞客戶端的連接,並且停止接收從客戶端發來的消息,以此避免服務器的崩潰,客戶端與服務端的心態檢測機制也會失效。 如下圖:
當出現blocking或blocked話說明到達了閾值和以及高負荷運行了。
RabbitMQ的內存控制
參考幫助文檔:https://www.rabbitmq.com/configure.html
當出現警告的時候,可以通過配置去修改和調整
①命令的方式
rabbitmqctl set_vm_memory_high_watermark <fraction>rabbitmqctl
set_vm_memory_high_watermark absolute 50MB
fraction/value 為內存閾值。默認情況是:0.4/2GB,代表的含義是:當RabbitMQ的內存超過40%時,就會產生警告並且阻塞所有生產者的連接。通過此命令修改閾值在Broker重啟以后將會失效,通過修改配置文件方式設置的閾值則不會隨着重啟而消失,但修改了配置文件一樣要重啟broker才會生效。
分析:
②配置文件方式 rabbitmq.conf
當前配置文件地址:/etc/rabbitmq/rabbitmq.conf
#默認 #vm_memory_high_watermark.relative = 0.4 # 使用relative相對值進行設置fraction,建議取值在04~0.7之間,不建議超過0.7. vm_memory_high_watermark.relative = 0.6 # 使用absolute的絕對值的方式,但是是KB,MB,GB對應的命令如下: vm_memory_high_watermark.absolute = 2GB
RabbitMQ的內存換頁
在某個Broker節點及內存阻塞生產者之前,它會嘗試將隊列中的消息換頁到磁盤以釋放內存空間,持久化和非持久化的消息都會寫入磁盤中,其中持久化的消息本身就在磁盤中有一個副本,所以在轉移的過程中持久化的消息會先從內存中清除掉。
比如有1000MB內存,當內存的使用率達到了400MB,已經達到了極限,但是因為配置的換頁內存0.5,這個時候會在達到極限400mb之前,會把內存中的200MB進行轉移到磁盤中。從而達到穩健的運行。
可以通過設置 vm_memory_high_watermark_paging_ratio
來進行調整。
vm_memory_high_watermark.relative = 0.4
vm_memory_high_watermark_paging_ratio = 0.7(設置小於1的值)
為什么設置小於1,以為你如果你設置為1的閾值。內存都已經達到了極限了。你在去換頁意義不是很大了。
RabbitMQ的磁盤預警
當磁盤的剩余空間低於確定的閾值時,RabbitMQ同樣會阻塞生產者,這樣可以避免因非持久化的消息持續換頁而耗盡磁盤空間導致服務器崩潰。
rabbitmqctl set_disk_free_limit <disk_limit> rabbitmqctl set_disk_free_limit memory_limit <fraction> disk_limit:固定單位 KB MB GB fraction :是相對閾值,建議范圍在:1.0~2.0之間。(相對於內存)
通過配置文件配置如下:
disk_free_limit.relative = 3.0 disk_free_limit.absolute = 50mb
本博客示例涉及代碼均已上傳至Github: