RabbitMQ由淺入深入門全總結(二)


寫在最前面

距離上一次發文章已經很久了,其實這段時間一直也沒有停筆,只不過在忙着找工作還有學校結課的事情,重新弄了一下博客,后面也會陸陸續續會把文章最近更新出來~

  • 這篇文章有點長,就分了兩篇
  • PS:那個Github上Java知識問答的文章也沒有停筆,最近也會陸續更新

文章目錄:

6. 進階補充

6.1 過期時間設置(TTL)

過期時間(TTL)就是對消息或者隊列設置一個時效,只有在時間范圍內才可以被被消費者接收獲取,超過過期時間后消息將自動被刪除。

注:我們主要講消息過期,在消息過期的第一種方式中,順便也就會提到隊列過期的設置方式

  1. 通過隊列屬性設置,隊列中所有消息都有相同的過期時間
  2. 對消息進行單獨設置,每條消息 TTL可以不同

兩種方法同時被使用時,以兩者過期時間 TTL 較小的那個數值為准。消息在隊列的生存時間一旦超過設置的 TTL 值,就稱為 Dead Message 被投遞到死信隊列,消費者將無法再收到該消息(死信隊列是我們下一點要講的)

6.1.1 應用於全部消息的過期時間

  • 配置類
@Configuration
public class RabbitMqConfiguration {

    public static final String TOPIC_EXCHANGE = "topic_order_exchange";
    public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
    public static final String TOPIC_ROUTINGKEY_1 = "test.*";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue1() {
        // 創建參數 Map 容器
        Map<String, Object> args = new HashMap<>();
        // 設置消息過期時間 注意此處是數值 5000 不是字符串
        args.put("x-message-ttl", 5000);
        // 設置隊列過期時間
        args.put("x-expires", 8000);
        // 在最后傳入額外參數 即這些過期信息
        return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
    }

    @Bean
    public Binding bindingTopic1() {
        return BindingBuilder.bind(topicQueue1())
                .to(topicExchange())
                .with(TOPIC_ROUTINGKEY_1);
    }
}
  1. 創建參數 Map 容器:類型是在 Queue 參數中所要求的,要按照要求來。
  2. 設置消息過期時間:這里設置的消息過期時間,會應用到所有消息中。
  3. 設置隊列過期時間
  4. 傳入額外參數:將上述配置好的過期時間設置,通過 Queue 傳入即可。
  • 生產者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired

    @Test
    public void testTopicSendMessage() {
        rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order.insert", "This is a message !");
    }
}

不要配置消費者,然后就可以在 Web 管理器中看到效果了

6.1.2 應用於單獨消息的過期時間

  • 配置中保持最初的樣子就行了,就不需要配置過期時間了
  • 生產者中配置消息單獨的過期時間
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired

    @Test
    public void testTopicSendMessage2() {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor(){
            public Message postProcessMessage(Message message){
                // 注意此處是 字符串 “5000”
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };
        rabbitTemplate.convertAndSend(RabbitMqConfiguration.TOPIC_EXCHANGE, "test.order",
                "This is a message 002 !",messagePostProcessor);
    }
}

6.2 死信隊列

死信官方原文為 Dead letter ,它是RabbitMQ中的一種消息機制,當你在消費消息時,如果隊列以及隊列里的消息出現以下情況,說明當前消息就成為了 “死信”,如果配置了死信隊列,這些數據就會傳送到其中,如果沒有配置就會直接丟棄。

  1. 消息被拒絕
  2. 消息過期
  3. 隊列達到最大長度

不過死信隊列並不是什么很特殊的存在,我們只需要配置一個交換機,在消費的那個隊列中配置,出現死信就重新發送到剛才配置的交換機中去,進而被路由到與交換機綁定的隊列中去,這個隊列也就是死信隊列,所以從創建上看,它和普通的隊列沒什么區別。

6.2.1 應用場景

比如在一些比較重要的業務隊列中,未被正確消費的消息,往往我們並不想丟棄,因為丟棄后如果想恢復這些數據,往往需要運維人員從日志獲取到原消息,然后重新投遞消息,而配置了死信隊列,相當於給了未正確消費消息一個暫存的位置,日后需要恢復的時候,只需要編寫對應的代碼就可以了。

6.2.2 實現方式

  • 定義一個處理死信的交換機和隊列
@Configuration
public class DeadRabbitMqConfiguration{

    @Bean
    public DirectExchange deadDirect(){
        return new DirectExchange("dead_direct_exchange");}

    @Bean
    public Queue deadQueue(){
        return new Queue("dead_direct_queue");}
    @Bean
    public Binding deadBinds(){
        return BindingBuilder.bind(deadQueue()).to(deadDirect()).with("dead");
    }
}
  • 在正常的消費隊列中指定死信隊列
@Configuration
public class RabbitMqConfiguration {

    public static final String TOPIC_EXCHANGE = "topic_order_exchange";
    public static final String TOPIC_QUEUE_NAME_1 = "test_topic_queue_1";
    public static final String TOPIC_ROUTINGKEY_1 = "test.*";

    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange(TOPIC_EXCHANGE);
    }

    @Bean
    public Queue topicQueue1() {
        // 設置過期時間
        Map<String, Object> args = new HashMap<>();
        args.put("x-message-ttl", 5000);
        // 設置死信隊列交換器
        args.put("x-dead-letter-exchange","dead_direct_exchange");
        // 設置交換路由的路由key fanout 模式不需要配置此條
        args.put("x-dead-letter-routing-key","dead");
        return new Queue(TOPIC_QUEUE_NAME_1, true, false, false, args);
    }

    @Bean
    public Binding bindingTopic1() {
        return BindingBuilder.bind(topicQueue1())
                .to(topicExchange())
                .with(TOPIC_ROUTINGKEY_1);
    }
}

6.3 內存及磁盤監控

6.3.1 內存告警及控制

為了防止避免服務器因內存不夠而崩潰,所以 RabbitMQ 設定了一個閾值,當內存使用量超過閾值的時候,RabbitMQ 會暫時阻塞所有客戶端的連接,並且停止繼續接受新消息。

有兩種方式可以修改這個閾值

  1. 通過命令(二選一即可)
    • 命令的方式會在 Broker 重啟后失效
# 通過百分比設置的命令 <fraction> 處代表百分比小數例如 0.6
rabbitmqctl set_vm_memory_high_watermark <fraction>
# 通過絕對值設置的命令 <value> 處代表設置的一個固定值例如 700MB
rabbitmqctl set_vm_memory_high_watermark absolute <value>
  1. 通過修改配置文件 rabbitmq.conf
    • 配置文件每次啟動都會加載,屬於永久有效
# 百分比設置 默認值為 0.4 推薦 0.4-0.7 之間
vm_memory_high_watermark.relative = 0.5
# 固定值設置
vm_memory_high_watermark.absolute = 2GB

6.3.2 內存換頁

在客戶端連接和生產者被阻塞之前,它會嘗試將隊列中的消息換頁到磁盤中,這種思想在操作系統中其實非常常見,以最大程度的滿足消息的正常處理。

當內存換頁發生后,無論持久化還是非持久化的消息,都會被轉移到磁盤,而由於持久化的消息本來就在磁盤中有一個持久化的副本,所以會優先移除持久化的消息。

默認情況下,當內存達到閾值的 50 % 的時候,就會進行換頁處理。

可以通過設置 vm_memory_high_watermark_paging_ratio 修改

# 值小於 1, 如果大於 1 就沒有意義了
vm_memory_high_watermark_paging_ratio = 0.6

6.3.3 磁盤預警

如果無止境的換頁,也很有可能會導致耗盡磁盤空間導致服務器崩潰,所以 RabbitMQ 又提供了一個磁盤預警的閾值,當低於這個值的時候就會進行報警,默認是 50MB,可以通過命令的方式修改

# 固定值
rabbitmqctl set_disk_free_limit <disk_limit>
# 百分數
rabbitmqctl set_disk_free_limit memory_limit <fraction>

6.4 消息的可靠傳遞

生產者向 RabbitMQ 中發送消息的時候,可能會因為網絡等種種原因導致發送失敗,所以 RabbitMQ 提供了一系列保證消息可靠傳遞的機制,可以大致分為生產者和消費者兩部分的處理

6.4.1 生產者中的機制

生產者作為消息的發送者,需要保證自己的消息發送成功,RabbitMQ 提供了兩種方式來保證這一點。-

  1. confirm 確認模式
  2. return 退回模式

6.4.1.1 confirm 確認模式

生產者發送消息后,會異步等待接收一個 ack 應答,收到返回的 ack 確認消息后,根據 ack是 true 還是 false,調用 confirmCallback 接口進行處理

  • 配置類
spring:
  rabbitmq:
    # 發送確認
    publisher-confirm-type: correlated
  • 實現 ConfirmCallback 接口的 confirm 方法
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {

    /**
     * @param correlationData 相關配置信息
     * @param ack             exchange交換機 是否成功收到了消息。true 成功,false代表失敗
     * @param cause           失敗原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            //接收成功
            System.out.println("消息成功發送到交換機");
        } else {
            //接收失敗
            System.out.println("消息發送到交換機失敗,失敗原因: " + cause);
            // TODO 可以處理失敗的消息,例如再次發送等等
        }
    }
}
  • 聲明隊列和交換機
@Configuration
public class RabbitMqConfig {
    @Bean()
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }

    @Bean()
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirm_test_exchange");
    }

    @Bean
    public Binding confirmTestFanoutExchangeAndQueue() {
        return BindingBuilder.bind(confirmTestQueue()).to(confirmTestExchange());
    }
}
  • 生產者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired
    
     /**
     * 注入 ConfirmCallbackService
     */
    @Autowired
    private ConfirmCallbackService confirmCallbackService;

    @Test
    public void testConfirm() {
        // 設置確認回調類
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        // 發送消息
        rabbitTemplate.convertAndSend("confirm_test_exchange", "", "ConfirmCallback !");
    }
}

6.4.1.2 return 退回模式

當 Exchange 發送到 Queue 失敗時,會調用一個 returnsCallback,我們可以通過實現這個接口,然后來處理這種失敗的情況。

  • 在配置文件中開啟發送回調
spring:
  rabbitmq:
    # 發送回調
    publisher-returns: true
  • 實現 ReturnsCallback 的 returnedMessage 方法
//  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) 已經屬於過時方法了
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        System.out.println(returned);
    }
}
  • 聲明隊列和交換機(Direct 模式)
@Configuration
public class RabbitMqConfig {

    @Bean()
    public Queue returnsTestQueue() {
        return new Queue("return_test_queue", true, false, false);
    }

    @Bean()
    public DirectExchange returnsTestExchange() {
        return new DirectExchange("returns_test_exchange");
    }

    @Bean
    public Binding returnsTestDirectExchangeAndQueue() {
        return BindingBuilder.bind(returnsTestQueue()).to(returnsTestExchange()).with("info");
    }
}
  • 生產者
@SpringBootTest(classes = RabbitmqSpringbootApplication.class)
@RunWith(SpringRunner.class)
public class RabbitMqTest {
    /**
     * 注入 RabbitTemplate
     */
    @Autowired
    
    /**
     * 注入 ConfirmCallbackService
     */
    @Autowired
    private ConfirmCallbackService confirmCallbackService;
    
    /**
     * 注入 ReturnCallbackService
     */
    @Autowired
    private ReturnCallbackService returnCallbackService;

    @Test
    public void testReturn() {
        // 確保消息發送失敗后可以重新返回到隊列中
        rabbitTemplate.setMandatory(true);
        // 消息投遞到隊列失敗回調處理
        rabbitTemplate.setReturnsCallback(returnCallbackService);
        // 消息投遞確認模式
        rabbitTemplate.setConfirmCallback(confirmCallbackService);
        // 發送消息
        rabbitTemplate.convertAndSend("returns_test_exchange", "info", "ReturnsCallback !");
    }
}
  • 修改不同的路由key,即可測試出結果。

6.4.2 消費者中的機制

6.4.2.1 ack 確認機制

ack 表示收到消息的確認,默認是自動確認,但是它有三種類型

acknowledge-mode 選項介紹

  • auto:自動確認,為默認選項
  • manual:手動確認(按能力分配就需要設置為手動確認)
  • none:不確認,發送后自動丟棄

其中自動確認是指,當消息一旦被消費者接收到,則自動確認收到,並把這個消息從隊列中刪除。

但是在實際業務處理中,正確的接收到的消息可能會因為業務上的問題,導致消息沒有正確的被處理,但是如果設置了 手動確認方式,則需要在業務處理成功后,調用channel.basicAck(),手動簽收,如果出現異常,則調用 channel.basicNack()方法,讓其自動重新發送消息。

  • 配置文件
spring:
  rabbitmq:
    listener:
      simple:
      	# 手動確認
        acknowledge-mode: manual 
  • 消費者
@Component
@RabbitListener(queues = "confirm_test_queue")
public class TestConsumer {

    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            System.out.println("消息內容: " + new String(message.getBody()));

            System.out.println("業務出錯的位置:");
            int i = 66 / 0;
            
            // 手動簽收 deliveryTag標識代表隊列可以刪除了
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            // 拒絕簽收
            channel.basicNack(deliveryTag, true, true);
        }
    }
}

6.5 集群 & 6.6 分布式事務(待更新)

由於這兩個點篇幅也不短,實在不願草草簡單寫上了事,放到后面單獨的文章編寫,發布哇。

關於集群的搭建暫時可參考:https://blog.csdn.net/belonghuang157405/article/details/83540148


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM