rabbitMq消息持久化機制,和延時隊列


1、RabbitMQ的一大特色是消息的可靠性,那么它是如何保證消息可靠性的呢?

消息持久化。可以將Queue,Exchange,Message都設置為可持久化的。為了保證RabbitMQ在退出,服務重啟或者crash等異常情況下,也不會丟失消息。

2、RabbitMQ服務異常,重啟時候怎么保證消息不丟失,持久化的實現?

1、Queue(消息隊列)的持久化是通過durable=true來實現的。 

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
//關鍵的是第二個參數設置為true,即durable=true.
channel.queueDeclare("queue.persistent.name", true, false, false, null);
/到這步僅僅是做到了消息隊列的持久化,還沒有做消息持久化。

2、Message(消息)的持久化 ,通過設置消息是持久化的標識。

//MessageProperties.PERSISTENT_TEXT_PLAIN 
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());

3、Exchange(交換機)的持久化 。

//即在聲明的時候講durable字段設置為true即可。
channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);

3、死信隊列是什么?

1、死信隊列是一個普通的隊列,它沒有消費者,用來存儲有超時時間信息的消息,並且可以設置當消息超時(ttl),等待消息超時,將消息轉發到指定的Router隊列。

2、“死信”是RabbitMQ中的一種消息機制,當你在消費消息時,如果隊列里的消息出現以下情況:

  1. 消息被否定確認,使用 channel.basicNack 或 channel.basicReject ,並且此時requeue 屬性被設置為false
  2. 消息在隊列的存活時間超過設置的TTL時間。
  3. 消息隊列的消息數量已經超過最大隊列長度。

“死信”消息會被RabbitMQ進行特殊處理,如果配置了死信隊列信息,那么該消息將會被丟進死信隊列中,如果沒有配置,則該消息將會被丟棄。

3、延時隊列和轉發隊列是什么?

延時隊列:就是用來存放需要在指定時間被處理的元素的隊列。

轉發隊列:用來接收死信隊列超時消息,在接收到之后,消費者將消息解析,獲取queueName,body,再向所獲取的queueName隊列發送一條消息,內容為body。

分析:首先rabbitmq自己是不具備延時的功能的,除了使用官方提供的插件之外,我們還可以通過TTL(設置超時時間的方式)+ DLX(一個死信隊列) + Router(轉發隊列)來實現。

4、怎么設置消息超時時間ttl?

  TTLTTL是RabbitMQ中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有消息的最大存活時間,單位是毫秒。如果一條消息設置了TTL屬性或者進入了設置TTL屬性的隊列,那么這條消息如果在TTL設置的時間內沒有被消費,則會成為“死信”。

 

1、設置在隊列上:如果設置了隊列的TTL屬性,那么一旦消息過期,就會被隊列丟棄。

 

2、ttl可以設置在消息上:消息即使過期,也不一定會被馬上丟棄,因為消息是否過期是在即將投遞到消費者之前判定的,如果當前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間。

 

設置這個TTL值呢?有兩種方式,2種是在創建隊列的時候設置隊列的“x-message-ttl”屬性,如下:

 

//第一種這樣所有被投遞到該隊列的消息都最多不會存活超過6s。
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args); //第二種針對每條消息設置TTL
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

 

 

4、延時隊列如何通過rabbitmq來實現呢?

第一種:

  原理:生產者生產一條延時消息,根據需要延時時間的不同,設置不同的延時隊列,每個隊列都設置了不同的TTL屬性,並綁定在同一個死信交換機中,消息過期后,根據routingkey的不同,又會被路由到不同的死信隊列中,再等待消息超時,將消息轉發到指定的Router隊列。消費者只需要監聽對應的死信隊列進行處理即可。

第二種:使用rabbitmq插件實現。

如果不能實現在消息粒度上添加TTL,並使其在設置的TTL時間及時死亡,就無法設計成一個通用的延時隊列。

https://www.cnblogs.com/shihaiming/p/11081948.html

https://www.cnblogs.com/mfrank/p/11260355.html

 

 

 

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM