1、RabbitMQ的一大特色是消息的可靠性,那么它是如何保證消息可靠性的呢?
消息持久化。可以將Queue,Exchange,Message都設置為可持久化的。為了保證RabbitMQ在退出,服務重啟或者crash等異常情況下,也不會丟失消息。
2、RabbitMQ服務異常,重啟時候怎么保證消息不丟失,持久化的實現?
1、Queue(消息隊列)的持久化是通過durable=true來實現的。
Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //關鍵的是第二個參數設置為true,即durable=true. channel.queueDeclare("queue.persistent.name", true, false, false, null); /到這步僅僅是做到了消息隊列的持久化,還沒有做消息持久化。
2、Message(消息)的持久化 ,通過設置消息是持久化的標識。
//MessageProperties.PERSISTENT_TEXT_PLAIN channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
3、Exchange(交換機)的持久化 。
//即在聲明的時候講durable字段設置為true即可。 channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);
3、死信隊列是什么?
1、死信隊列是一個普通的隊列,它沒有消費者,用來存儲有超時時間信息的消息,並且可以設置當消息超時(ttl),等待消息超時,將消息轉發到指定的Router隊列。
2、“死信”是RabbitMQ中的一種消息機制,當你在消費消息時,如果隊列里的消息出現以下情況:
- 消息被否定確認,使用
channel.basicNack
或channel.basicReject
,並且此時requeue
屬性被設置為false
。 - 消息在隊列的存活時間超過設置的TTL時間。
- 消息隊列的消息數量已經超過最大隊列長度。
“死信”消息會被RabbitMQ進行特殊處理,如果配置了死信隊列信息,那么該消息將會被丟進死信隊列中,如果沒有配置,則該消息將會被丟棄。
3、延時隊列和轉發隊列是什么?
延時隊列:就是用來存放需要在指定時間被處理的元素的隊列。
轉發隊列:用來接收死信隊列超時消息,在接收到之后,消費者將消息解析,獲取queueName,body,再向所獲取的queueName隊列發送一條消息,內容為body。
分析:首先rabbitmq自己是不具備延時的功能的,除了使用官方提供的插件之外,我們還可以通過TTL(設置超時時間的方式)+ DLX(一個死信隊列) + Router(轉發隊列)來實現。
4、怎么設置消息超時時間ttl?
TTL
:TTL
是RabbitMQ中一個消息或者隊列的屬性,表明一條消息或者該隊列中的所有消息的最大存活時間
,單位是毫秒。如果一條消息設置了TTL屬性或者進入了設置TTL屬性的隊列,那么這條消息如果在TTL設置的時間內沒有被消費,則會成為“死信”。
1、設置在隊列上:如果設置了隊列的TTL屬性,那么一旦消息過期,就會被隊列丟棄。
2、ttl可以設置在消息上:消息即使過期,也不一定會被馬上丟棄,因為消息是否過期是在即將投遞到消費者之前判定的,如果當前隊列有嚴重的消息積壓情況,則已過期的消息也許還能存活較長時間。
設置這個TTL值呢?有兩種方式,2種是在創建隊列的時候設置隊列的“x-message-ttl”屬性,如下:
//第一種這樣所有被投遞到該隊列的消息都最多不會存活超過6s。
Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args); //第二種針對每條消息設置TTL
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
4、延時隊列如何通過rabbitmq來實現呢?
第一種:
原理:生產者生產一條延時消息,根據需要延時時間的不同,設置不同的延時隊列,每個隊列都設置了不同的TTL屬性,並綁定在同一個死信交換機中,消息過期后,根據routingkey的不同,又會被路由到不同的死信隊列中,再等待消息超時,將消息轉發到指定的Router隊列。消費者只需要監聽對應的死信隊列進行處理即可。
第二種:使用rabbitmq插件實現。
如果不能實現在消息粒度上添加TTL,並使其在設置的TTL時間及時死亡,就無法設計成一個通用的延時隊列。
https://www.cnblogs.com/shihaiming/p/11081948.html
https://www.cnblogs.com/mfrank/p/11260355.html