RabbitMQ持久化機制、內存磁盤控制(四)


一、持久化

如果看到這一篇文章的朋友,都是有經驗的開發人員,對持久化的概念就不用再做過多的解析了,經過前面的幾篇文章,其實不難發現RabbitMQ 的持久化其實就只分交換器持久化、隊列持久化和消息持久化這三個部分;

  • 定義持久化交換器,通過第三個參數 durable 開啟/關閉持久化
channel.exchangeDeclare(exchangeName, exchangeType, durable)
  • 定義持久化隊列,通過第二個參數 durable 開啟/關閉持久化
channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
  • 發送持久化消息,需要在消息屬性中設置 deliveryMode=2 , 此屬性在 BasicProperties 中,通過 basicPublish 方法的 props 參數傳入。
channel.basicPublish(exchange, routingKey, props, body);
BasicProperties 對象可以從RabbitMQ 內置的 MessageProperties 類中獲取
MessageProperties.PERSISTENT_TEXT_PLAIN 1
如果還需要設置其它屬性,可以通過 AMQP.BasicProperties.Builder 去構建一個BasicProperties 對象;這個用法在前兩篇文章中都有展示過
new AMQP.BasicProperties.Builder() .deliveryMode(2) .build()

二、持久化代碼演示

/**
 * 持久化示例
 */
public class Consumer {
    private static Runnable receive = new Runnable() {
        public void run() {
            // 1、創建連接工廠
            ConnectionFactory factory = new ConnectionFactory();
            // 2、設置連接屬性
            factory.setHost("192.168.0.1");
            factory.setUsername("admin");
            factory.setPassword("admin");

            Connection connection = null;
            Channel channel = null;
            final String clientName = Thread.currentThread().getName();
            String queueName = "routing_test_queue";

            try {
                // 3、從連接工廠獲取連接
                connection = factory.newConnection("消費者-" + clientName);

                // 4、從鏈接中創建通道
                channel = connection.createChannel();

                // 定義一個持久化的,direct類型交換器
                channel.exchangeDeclare("routing_test", "direct", true);
                // 定義一個持久化隊列
                channel.queueDeclare(queueName, true, false, false, null);

                // 將隊列和交換器綁定,第三個參數 routingKey是關鍵,通過此路由鍵決定接收誰的消息
                channel.queueBind(queueName, "routing_test", clientName);

                // 定義消息接收回調對象
                DeliverCallback callback = new DeliverCallback() {
                    public void handle(String consumerTag, Delivery message) throws IOException {
                        System.out.println(clientName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
                    }
                };
                // 監聽隊列
                channel.basicConsume(queueName, true, callback, new CancelCallback() {
                    public void handle(String consumerTag) throws IOException {
                    }
                });

                System.out.println(clientName + " 開始接收消息");
                System.in.read();

            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            } finally {
                // 8、關閉通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    } catch (TimeoutException e) {
                        e.printStackTrace();
                    }
                }

                // 9、關閉連接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        new Thread(receive, "c1").start();
        new Thread(receive, "c2").start();
    }
}
public class Producer {

    public static void main(String[] args) {
        // 1、創建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        // 2、設置連接屬性
        factory.setHost("192.168.0.1");
        factory.setUsername("admin");
        factory.setPassword("admin");

        Connection connection = null;
        Channel channel = null;

        try {
            // 3、從連接工廠獲取連接
            connection = factory.newConnection("生產者");

            // 4、從鏈接中創建通道
            channel = connection.createChannel();

            // 定義一個持久化的,direct類型交換器
            channel.exchangeDeclare("routing_test", "direct", true);

            // 內存、磁盤預警時用
            System.out.println("按回車繼續");
            System.in.read();

            // 消息內容
            String message = "Hello A";
            // 發送持久化消息到routing_test交換器上
            channel.basicPublish("routing_test", "c1", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("消息 " + message + " 已發送!");

            // 消息內容
            message = "Hello B";
            // 發送持久化消息到routing_test交換器上
            channel.basicPublish("routing_test", "c2", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
            System.out.println("消息 " + message + " 已發送!");

            // 內存、池畔預警時用
            System.out.println("按回車結束");
            System.in.read();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        } finally {
            // 7、關閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (TimeoutException e) {
                    e.printStackTrace();
                }
            }

            // 8、關閉連接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
}

二、內存告警

默認情況下 set_vm_memory_high_watermark 的值為 0.4,即內存閾值(臨界值)為 0.4,表示當RabbitMQ 使用的內存超過 40%時,就會產生內存告警並阻塞所有生產者的連接。一旦告警被解除(有消息被消費或者從內存轉儲到磁盤等情況的發生), 一切都會恢復正常。在出現內存告警后,所有的客戶端連接都會被阻塞。阻塞分為 blocking 和 blocked 兩種。
  • blocking:表示沒有發送消息的鏈接。
  • blocked:表示試圖發送消息的鏈接。
如果出現了內存告警,並且機器還有可用內存,可以通過命令調整內存閾值,解除告警。
rabbitmqctl set_vm_memory_high_watermark 1 1
或者
rabbitmqctl set_vm_memory_high_watermark absolute 1GB
但這種方式只是臨時調整,RabbitMQ 服務重啟后,會還原。如果需要永久調整,可以修改配置文件。但修改配置文件需要重啟RabbitMQ 服務才能生效。
修改配置文件: vim /etc/rabbitmq/rabbitmq.conf
vm_memory_high_watermark.relative = 0.4 1
或者
vm_memory_high_watermark.absolute = 1GB

三、模擬內存告警

1. 調整內存閾值,模擬出告警,在RabbitMQ 服務器上修改。 注意:修改之前,先在管理頁面看一下當前使用了多少,調成比當前值小
rabbitmqctl set_vm_memory_high_watermark absolute 50MB

2.刷新管理頁面(可能需要刷新多次),在 Overview -> Nodes 中可以看到Memory變成了紅色,表示此節點內存告警了

3. 啟動 Producer 和 Consumer(源碼鏈接在最下面)
4. 查看管理界面的 Connections 頁面,可以看到生產者和消費者的鏈接都處於 blocking 狀態。
5. 在 Producer 的控制台按回車健,再觀察管理界面的 Connections 頁面,會發現生產者的狀態成了 blocked 。
6. 此時雖然在 Producer 控制台看到了發送兩條消息的信息,但 Consumer 並沒有收到任何消息。並且在管理界面的 Queues 頁面也看到不到隊列的消息數量有變化。
7. 解除內存告警后,會發現 Consumer 收到了 Producer 發送的兩條消息。

四、內存換頁

  • 在Broker節點的使用內存即將達到內存閾值之前,它會嘗試將隊列中的消息存儲到磁盤以釋放內存空間,這個動作叫內存換頁。
  • 持久化和非持久化的消息都會被轉儲到磁盤中,其中持久化的消息本身就在磁盤中有一份副本,此時會將持久化的消息從內存中清除掉。
  • 默認情況下,在內存到達內存閾值的 50%時會進行換頁動作。也就是說,在默認的內存閾值為 0.4的情況下,當內存超過 0.4 x 0 .5=0.2 時會進行換頁動作。
  • 通過修改配置文件,調整內存換頁分頁閾值(不能通過命令調整)。
# 此值大於1時,相當於禁用了換頁功能。
 vm_memory_high_watermark_paging_ratio = 0.75

五、磁盤告警

  • 當磁盤剩余空間低於磁盤的閾值時,RabbitMQ 同樣會阻塞生產者,這樣可以避免因非持久化的消息持續換頁而耗盡磁盤空間導致服務崩潰
  • 默認情況下,磁盤閾值為50MB,表示當磁盤剩余空間低於50MB 時會阻塞生產者並停止內存中消息的換頁動作
  • 這個閾值的設置可以減小,但不能完全消除因磁盤耗盡而導致崩漬的可能性。比如在兩次磁盤空間檢測期間內,磁盤空間從大於50MB被耗盡到0MB
  • 通過命令可以調整磁盤閾值,臨時生效,重啟恢復
# disk_limit 為固定大小,單位為MB、GB
 rabbitmqctl set_disk_free_limit <disk_limit>
或者
# fraction 為相對比值,建議的取值為1.0~2.0之間
 rabbitmqctl set_disk_free_limit mem_relative <fraction>

其實這些內容在官網上都有說明,有興趣可以直接看官網:https://www.rabbitmq.com/alarms.html

  git源碼:https://gitee.com/TongHuaShuShuoWoDeJieJu/rabbit.git


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM