寫在最前面
距離上一次發文章已經很久了,其實這段時間一直也沒有停筆,只不過在忙着找工作還有學校結課的事情,重新弄了一下博客,后面也會陸陸續續會把文章最近更新出來~
- 這篇文章有點長,就分了兩篇
- PS:那個Github上Java知識問答的文章也沒有停筆,最近也會陸續更新
文章目錄:
6. 進階補充
6.1 過期時間設置(TTL)
過期時間(TTL)就是對消息或者隊列設置一個時效,只有在時間范圍內才可以被被消費者接收獲取,超過過期時間后消息將自動被刪除。
注:我們主要講消息過期,在消息過期的第一種方式中,順便也就會提到隊列過期的設置方式
- 通過隊列屬性設置,隊列中所有消息都有相同的過期時間
- 對消息進行單獨設置,每條消息 TTL可以不同
兩種方法同時被使用時,以兩者過期時間 TTL 較小的那個數值為准。消息在隊列的生存時間一旦超過設置的 TTL 值,就稱為 Dead Message 被投遞到死信隊列,消費者將無法再收到該消息(死信隊列是我們下一點要講的)
6.1.1 應用於全部消息的過期時間
- 配置類
@Configuration
public class RabbitMqConfiguration {
public static final String TOPIC_EXCHANGE = "topic_order_exchange";
public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
public static final String TOPIC_ROUTINGKEY_1 = "test.*";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Queue topicQueue1() {
// 創建參數 Map 容器
Map<String, Object> args = new HashMap<>();
// 設置消息過期時間 注意此處是數值 5000 不是字符串
args.put("x-message-ttl", 5000);
// 設置隊列過期時間
args.put("x-expires", 8000);
// 在最后傳入額外參數 即這些過期信息
return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
}
@Bean
public Binding bindingTopic1() {
return BindingBuilder.bind(topicQueue1())
.to(topicExchange())
.with(TOPIC_ROUTINGKEY_1);
}
}
- 創建參數 Map 容器:類型是在 Queue 參數中所要求的,要按照要求來。
- 設置消息過期時間:這里設置的消息過期時間,會應用到所有消息中。
- 設置隊列過期時間
- 傳入額外參數:將上述配置好的過期時間設置,通過 Queue 傳入即可。
- 生產者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired
@Test
public void testTopicSendMessage() {
rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !");
}
}
不要配置消費者,然后就可以在 Web 管理器中看到效果了
6.1.2 應用於單獨消息的過期時間
- 配置中保持最初的樣子就行了,就不需要配置過期時間了
- 生產者中配置消息單獨的過期時間
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired
@Test
public void testTopicSendMessage2() {
MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
public Message postProcessMessage(Message message){
// 注意此處是 字符串 “5000”
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("UTF-8");
return message;
}
};
rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order",
"This is a message 002 !",messagePostProcessor);
}
}
6.2 死信隊列
死信官方原文為 Dead letter ,它是RabbitMQ中的一種消息機制,當你在消費消息時,如果隊列以及隊列里的消息出現以下情況,說明當前消息就成為了 “死信”,如果配置了死信隊列,這些數據就會傳送到其中,如果沒有配置就會直接丟棄。
- 消息被拒絕
- 消息過期
- 隊列達到最大長度
不過死信隊列並不是什么很特殊的存在,我們只需要配置一個交換機,在消費的那個隊列中配置,出現死信就重新發送到剛才配置的交換機中去,進而被路由到與交換機綁定的隊列中去,這個隊列也就是死信隊列,所以從創建上看,它和普通的隊列沒什么區別。
6.2.1 應用場景
比如在一些比較重要的業務隊列中,未被正確消費的消息,往往我們並不想丟棄,因為丟棄后如果想恢復這些數據,往往需要運維人員從日志獲取到原消息,然后重新投遞消息,而配置了死信隊列,相當於給了未正確消費消息一個暫存的位置,日后需要恢復的時候,只需要編寫對應的代碼就可以了。
6.2.2 實現方式
- 定義一個處理死信的交換機和隊列
@Configuration
public class DeadRabbitMqConfiguration{
@Bean
public DirectExchange deadDirect(){
return new DirectExchange("dead_direct_exchange");}
@Bean
public Queue deadQueue(){
return new Queue("dead_direct_queue");}
@Bean
public Binding deadBinds(){
return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
}
}
- 在正常的消費隊列中指定死信隊列
@Configuration
public class RabbitMqConfiguration {
public static final String TOPIC_EXCHANGE = "topic_order_exchange";
public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
public static final String TOPIC_ROUTINGKEY_1 = "test.*";
@Bean
public TopicExchange topicExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
@Bean
public Queue topicQueue1() {
// 設置過期時間
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);
// 設置死信隊列交換器
args.put("x-dead-letter-exchange","dead_direct_exchange");
// 設置交換路由的路由key fanout 模式不需要配置此條
args.put("x-dead-letter-routing-key","dead");
return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
}
@Bean
public Binding bindingTopic1() {
return BindingBuilder.bind(topicQueue1())
.to(topicExchange())
.with(TOPIC_ROUTINGKEY_1);
}
}
6.3 內存及磁盤監控
6.3.1 內存告警及控制
為了防止避免服務器因內存不夠而崩潰,所以 RabbitMQ 設定了一個閾值,當內存使用量超過閾值的時候,RabbitMQ 會暫時阻塞所有客戶端的連接,並且停止繼續接受新消息。
有兩種方式可以修改這個閾值
- 通過命令(二選一即可)
- 命令的方式會在 Broker 重啟后失效
# 通過百分比設置的命令 <fraction> 處代表百分比小數例如 0.6
rabbitmqctl set_vm_memory_high_watermark <fraction>
# 通過絕對值設置的命令 <value> 處代表設置的一個固定值例如 700MB
rabbitmqctl set_vm_memory_high_watermark absolute <value>
- 通過修改配置文件 rabbitmq.conf
- 配置文件每次啟動都會加載,屬於永久有效
# 百分比設置 默認值為 0.4 推薦 0.4-0.7 之間
vm_memory_high_watermark.relative = 0.5
# 固定值設置
vm_memory_high_watermark.absolute = 2GB
6.3.2 內存換頁
在客戶端連接和生產者被阻塞之前,它會嘗試將隊列中的消息換頁到磁盤中,這種思想在操作系統中其實非常常見,以最大程度的滿足消息的正常處理。
當內存換頁發生后,無論持久化還是非持久化的消息,都會被轉移到磁盤,而由於持久化的消息本來就在磁盤中有一個持久化的副本,所以會優先移除持久化的消息。
默認情況下,當內存達到閾值的 50 % 的時候,就會進行換頁處理。
可以通過設置 vm_memory_high_watermark_paging_ratio 修改
# 值小於 1, 如果大於 1 就沒有意義了
vm_memory_high_watermark_paging_ratio = 0.6
6.3.3 磁盤預警
如果無止境的換頁,也很有可能會導致耗盡磁盤空間導致服務器崩潰,所以 RabbitMQ 又提供了一個磁盤預警的閾值,當低於這個值的時候就會進行報警,默認是 50MB,可以通過命令的方式修改
# 固定值
rabbitmqctl set_disk_free_limit <disk_limit>
# 百分數
rabbitmqctl set_disk_free_limit memory_limit <fraction>
6.4 消息的可靠傳遞
生產者向 RabbitMQ 中發送消息的時候,可能會因為網絡等種種原因導致發送失敗,所以 RabbitMQ 提供了一系列保證消息可靠傳遞的機制,可以大致分為生產者和消費者兩部分的處理
6.4.1 生產者中的機制
生產者作為消息的發送者,需要保證自己的消息發送成功,RabbitMQ 提供了兩種方式來保證這一點。-
- confirm 確認模式
- return 退回模式
6.4.1.1 confirm 確認模式
生產者發送消息后,會異步等待接收一個 ack 應答,收到返回的 ack 確認消息后,根據 ack是 true 還是 false,調用 confirmCallback 接口進行處理
- 配置類
spring:
rabbitmq:
# 發送確認
publisher-confirm-type: correlated
- 實現 ConfirmCallback 接口的 confirm 方法
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
/**
* @param correlationData 相關配置信息
* @param ack exchange交換機 是否成功收到了消息。true 成功,false代表失敗
* @param cause 失敗原因
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
//接收成功
System.out.println("消息成功發送到交換機");
} else {
//接收失敗
System.out.println("消息發送到交換機失敗,失敗原因: " + cause);
// TODO 可以處理失敗的消息,例如再次發送等等
}
}
}
- 聲明隊列和交換機
@Configuration
public class RabbitMqConfig {
@Bean()
public Queue confirmTestQueue() {
return new Queue("confirm_test_queue", true, false, false);
}
@Bean()
public FanoutExchange confirmTestExchange() {
return new FanoutExchange("confirm_test_exchange");
}
@Bean
public Binding confirmTestFanoutExchangeAndQueue() {
return BindingBuilder.bind(confirmTestQueue()).to(confirmTestExchange());
}
}
- 生產者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired
/**
* 注入 ConfirmCallbackService
*/
@Autowired
private ConfirmCallbackService confirmCallbackService;
@Test
public void testConfirm() {
// 設置確認回調類
rabbitTemplate.setConfirmCallback(confirmCallbackService);
// 發送消息
rabbitTemplate.convertAndSend("confirm_test_exchange", "", "ConfirmCallback !");
}
}
6.4.1.2 return 退回模式
當 Exchange 發送到 Queue 失敗時,會調用一個 returnsCallback,我們可以通過實現這個接口,然后來處理這種失敗的情況。
- 在配置文件中開啟發送回調
spring:
rabbitmq:
# 發送回調
publisher-returns: true
- 實現 ReturnsCallback 的 returnedMessage 方法
// public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 已經屬於過時方法了
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
@Override
public void returnedMessage(ReturnedMessage returned) {
System.out.println(returned);
}
}
- 聲明隊列和交換機(Direct 模式)
@Configuration
public class RabbitMqConfig {
@Bean()
public Queue returnsTestQueue() {
return new Queue("return_test_queue", true, false, false);
}
@Bean()
public DirectExchange returnsTestExchange() {
return new DirectExchange("returns_test_exchange");
}
@Bean
public Binding returnsTestDirectExchangeAndQueue() {
return BindingBuilder.bind(returnsTestQueue()).to(returnsTestExchange()).with("info");
}
}
- 生產者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
/**
* 注入 RabbitTemplate
*/
@Autowired
/**
* 注入 ConfirmCallbackService
*/
@Autowired
private ConfirmCallbackService confirmCallbackService;
/**
* 注入 ReturnCallbackService
*/
@Autowired
private ReturnCallbackService returnCallbackService;
@Test
public void testReturn() {
// 確保消息發送失敗后可以重新返回到隊列中
rabbitTemplate.setMandatory(true);
// 消息投遞到隊列失敗回調處理
rabbitTemplate.setReturnsCallback(returnCallbackService);
// 消息投遞確認模式
rabbitTemplate.setConfirmCallback(confirmCallbackService);
// 發送消息
rabbitTemplate.convertAndSend("returns_test_exchange", "info", "ReturnsCallback !");
}
}
- 修改不同的路由key,即可測試出結果。
6.4.2 消費者中的機制
6.4.2.1 ack 確認機制
ack 表示收到消息的確認,默認是自動確認,但是它有三種類型
acknowledge-mode 選項介紹
- auto:自動確認,為默認選項
- manual:手動確認(按能力分配就需要設置為手動確認)
- none:不確認,發送后自動丟棄
其中自動確認是指,當消息一旦被消費者接收到,則自動確認收到,並把這個消息從隊列中刪除。
但是在實際業務處理中,正確的接收到的消息可能會因為業務上的問題,導致消息沒有正確的被處理,但是如果設置了 手動確認方式,則需要在業務處理成功后,調用channel.basicAck(),手動簽收,如果出現異常,則調用 channel.basicNack()方法,讓其自動重新發送消息。
- 配置文件
spring:
rabbitmq:
listener:
simple:
# 手動確認
acknowledge-mode: manual
- 消費者
@Component
@RabbitListener(queues = "confirm_test_queue")
public class TestConsumer {
@RabbitHandler
public void processHandler(String msg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
System.out.println("消息內容: " + new String(message.getBody()));
System.out.println("業務出錯的位置:");
int i = 66 / 0;
// 手動簽收 deliveryTag標識代表隊列可以刪除了
channel.basicAck(deliveryTag, true);
} catch (Exception e) {
// 拒絕簽收
channel.basicNack(deliveryTag, true, true);
}
}
}
6.5 集群 & 6.6 分布式事務(待更新)
由於這兩個點篇幅也不短,實在不願草草簡單寫上了事,放到后面單獨的文章編寫,發布哇。
關於集群的搭建暫時可參考:https://blog.csdn.net/belonghuang157405/article/details/83540148
