RabbitMQ限流機制(五)


       在服務端的穩定系的體系質量保障中,一個是考慮在客戶端高並發的請求后,服務端如何能夠

接收所有的請求並且服務端能夠頂得住洪流的負載。這中間就需要涉及考慮調度機制和隊列機制。比

如在2022年中,西安一碼通是崩潰了又崩潰,這就是很典型的在高可用設計和穩定性體系建設方面

缺少系統化的思考。作為主流的核心中間件RabbitMQ,也是考慮到了限流的機制。

一、為什么要限流?

         如果生產者批量發送消息,但是消費者接收的能力是很慢,那么就會導致堆積的MQ的消息越

來越多,最后導致無法承受,從而出現崩潰,這種情況是最不願意看到的情況。那么我們可以把限流

解釋為RabbitMQ把收到的消息沒來得及處理(可能是資源也可能是其他情況),但是這個時候生產

者推送了批量的消息過來,單個客戶端無法承受這么多的數據流量,最后結果就是崩潰,從而影響

業務端使用的客戶,給客戶造成了損失。

二、RabbitMQ限流機制

         在RabbitMQ消息隊列服務器中,限流機制主要是由消費者來承載,這其實從邏輯上是很好理解

的,畢竟消費者端是作為接收端,而生產者作為發送端是不需要限流的策略機制的。在RabbitMQ中提

供了QOS的功能,也可以說是服務質量保障,它可以解決消費端存在大批量的消息的時候進行限流。使

用到的方法具體是BasicQos。使用限流的前提是手工簽收,不是自動簽收,也就是basicConsumer方

法中的autoAck的參數,需要把該參數設置為false,也就是手工簽收。

三、限流案例實現

3.1、生產端的代碼

      

package com.example.rabbitmq.currentLimit;

import com.rabbitmq.client.*;

public class ProducerLimit
{
    private  static  final  String exchangeName="test_qos";
    private  static  final  String routyKey="qos.test";

    public static void main(String[] args) throws  Exception
    {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("101.**.***.84");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("wuya");
        connectionFactory.setPassword("java");
        connectionFactory.setVirtualHost("/");

        Connection connection = connectionFactory.newConnection();
        Channel channel = connection.createChannel();


        String msg = "Hello RabbitMQ QOS Message";

        for(int i=0;i<5;i++)
        {
            channel.basicPublish(exchangeName,routyKey,true,null,msg.getBytes());
        }

    }
}

3.2、消費端的代碼

package com.example.rabbitmq.currentLimit;

import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class ConsumerLimit
{
    private static final String EXCHANGE = "test_qos";
    private  static  final String queueName="test_qos_queue";
    private  static  final  String routingKey="qos.#";

    public static void main(String[] args) throws  Exception
    {
        try{
            ConnectionFactory connectionFactory=new ConnectionFactory();
            connectionFactory.setHost("101.**.***.84");
            connectionFactory.setPort(5672);
            connectionFactory.setUsername("wuya");
            connectionFactory.setPassword("java");
            connectionFactory.setVirtualHost("/");

            Connection connection=connectionFactory.newConnection();
            Channel channel=connection.createChannel();
            channel.exchangeDeclare(EXCHANGE,"topic",true,false,null);
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,EXCHANGE,routingKey);

            channel.basicQos(0,1,false);
            //設置手工簽收的方式,限流的前提是必須把autoAck設置為false
            channel.basicConsume(queueName,false,new MyConsumer(channel=channel));
        }catch (Exception e){
            e.printStackTrace();
        }
    }
}

備注:在如上的代碼中,在方法basicQos中,第二個參數prefetchCount設置為1,表示的是生產者給消費者一個

消息后,等待消費者ack處理完成后,然后再接收新的消息,第三個參數global設置為false表示的是consumer級

別,而不是channel級別的消息。

3.3、自定義接收消息

package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("---------------consumer---------------\n");
        System.err.println("consumerTag:"+consumerTag);
        System.err.println("envelope:"+envelope);
        System.err.println("properties:"+properties);
        System.err.println("the message received:"+new String(body));

//        channel.basicAck(envelope.getDeliveryTag(),false);
    }
}

3.4、案例結果

         執行如上的代碼,可以在RabbitMQ控制台就能夠看到限流的機制,消費者返回的消息具體如下:

---------------consumer---------------

consumerTag:amq.ctag-wx7ky-PxTenMX7Q8MbeoXA
envelope:Envelope(deliveryTag=1, redeliver=false, exchange=test_qos, routingKey=qos.test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
the message received:Hello RabbitMQ QOS Message

在RabbitMQ的WEB控制台中就會顯示出存在的總的消息是5,等待中的消息是4,等待ack簽收的消息是1個,

具體如下展示:

我們針對消費者接收消息的代碼進行完善,具體如下:

package com.example.rabbitmq;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

import java.io.IOException;

public class MyConsumer extends DefaultConsumer
{
    private  Channel channel;

    /**
     * Constructs a new instance and records its association to the passed-in channel.
     *
     * @param channel the channel to which this consumer is attached
     */
    public MyConsumer(Channel channel)
    {
        super(channel);
        this.channel=channel;
    }

    @Override
    public void handleDelivery(
            String consumerTag,
            Envelope envelope,
            AMQP.BasicProperties properties,
            byte[] body) throws IOException
    {
        System.err.println("---------------consumer---------------\n");
        System.err.println("consumerTag:"+consumerTag);
        System.err.println("envelope:"+envelope);
        System.err.println("properties:"+properties);
        System.err.println("the message received:"+new String(body));
        channel.basicAck(envelope.getDeliveryTag(),false);

    }
}

這地方設置的是非批量的方式來接收消息,我們就可以看到再次啟動消費者的程序后,就會看到每個

消息都會有一個確認ack的過程,具體如下:

四、穩定性的保障體系思考

          我一直認為作為一個質量保障工程師,在穩定系的體系建設方面的前提是我們需要懂得后端涉及到的技術,那么

知道具體的技術,我們幾可以設計對應的測試用例以及測試場景,來驗證后端這部分的穩定性,比如針對限流部分,那

么我們可以模擬生產者端,批量的往消費者端寫數據,來驗證消費者端在無法消費的情況下,它的處理邏輯以及限流機

制的合理性,這中間,不管是限流還是不限流,其實都是為客戶服務的思考路線,那么限流了,是否影響客戶的正常使用?

如果影響了,這個影響點具體是什么,有哪些,質量保障團隊都需要通過測試驗證,來拿出一個評估的數據。感謝您

的閱讀,您也可關注我的公眾號:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM