在服務端的穩定系的體系質量保障中,一個是考慮在客戶端高並發的請求后,服務端如何能夠
接收所有的請求並且服務端能夠頂得住洪流的負載。這中間就需要涉及考慮調度機制和隊列機制。比
如在2022年中,西安一碼通是崩潰了又崩潰,這就是很典型的在高可用設計和穩定性體系建設方面
缺少系統化的思考。作為主流的核心中間件RabbitMQ,也是考慮到了限流的機制。
一、為什么要限流?
如果生產者批量發送消息,但是消費者接收的能力是很慢,那么就會導致堆積的MQ的消息越
來越多,最后導致無法承受,從而出現崩潰,這種情況是最不願意看到的情況。那么我們可以把限流
解釋為RabbitMQ把收到的消息沒來得及處理(可能是資源也可能是其他情況),但是這個時候生產
者推送了批量的消息過來,單個客戶端無法承受這么多的數據流量,最后結果就是崩潰,從而影響
業務端使用的客戶,給客戶造成了損失。
二、RabbitMQ限流機制
在RabbitMQ消息隊列服務器中,限流機制主要是由消費者來承載,這其實從邏輯上是很好理解
的,畢竟消費者端是作為接收端,而生產者作為發送端是不需要限流的策略機制的。在RabbitMQ中提
供了QOS的功能,也可以說是服務質量保障,它可以解決消費端存在大批量的消息的時候進行限流。使
用到的方法具體是BasicQos。使用限流的前提是手工簽收,不是自動簽收,也就是basicConsumer方
法中的autoAck的參數,需要把該參數設置為false,也就是手工簽收。
三、限流案例實現
3.1、生產端的代碼
package com.example.rabbitmq.currentLimit;
import com.rabbitmq.client.*;
public class ProducerLimit
{
private static final String exchangeName="test_qos";
private static final String routyKey="qos.test";
public static void main(String[] args) throws Exception
{
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("101.**.***.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String msg = "Hello RabbitMQ QOS Message";
for(int i=0;i<5;i++)
{
channel.basicPublish(exchangeName,routyKey,true,null,msg.getBytes());
}
}
}
3.2、消費端的代碼
package com.example.rabbitmq.currentLimit;
import com.example.rabbitmq.MyConsumer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConsumerLimit
{
private static final String EXCHANGE = "test_qos";
private static final String queueName="test_qos_queue";
private static final String routingKey="qos.#";
public static void main(String[] args) throws Exception
{
try{
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost("101.**.***.84");
connectionFactory.setPort(5672);
connectionFactory.setUsername("wuya");
connectionFactory.setPassword("java");
connectionFactory.setVirtualHost("/");
Connection connection=connectionFactory.newConnection();
Channel channel=connection.createChannel();
channel.exchangeDeclare(EXCHANGE,"topic",true,false,null);
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,EXCHANGE,routingKey);
channel.basicQos(0,1,false);
//設置手工簽收的方式,限流的前提是必須把autoAck設置為false
channel.basicConsume(queueName,false,new MyConsumer(channel=channel));
}catch (Exception e){
e.printStackTrace();
}
}
}
備注:在如上的代碼中,在方法basicQos中,第二個參數prefetchCount設置為1,表示的是生產者給消費者一個
消息后,等待消費者ack處理完成后,然后再接收新的消息,第三個參數global設置為false表示的是consumer級
別,而不是channel級別的消息。
3.3、自定義接收消息
package com.example.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class MyConsumer extends DefaultConsumer
{
private Channel channel;
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
public MyConsumer(Channel channel)
{
super(channel);
this.channel=channel;
}
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
System.err.println("---------------consumer---------------\n");
System.err.println("consumerTag:"+consumerTag);
System.err.println("envelope:"+envelope);
System.err.println("properties:"+properties);
System.err.println("the message received:"+new String(body));
// channel.basicAck(envelope.getDeliveryTag(),false);
}
}
3.4、案例結果
執行如上的代碼,可以在RabbitMQ控制台就能夠看到限流的機制,消費者返回的消息具體如下:
---------------consumer---------------
consumerTag:amq.ctag-wx7ky-PxTenMX7Q8MbeoXA
envelope:Envelope(deliveryTag=1, redeliver=false, exchange=test_qos, routingKey=qos.test)
properties:#contentHeader<basic>(content-type=null, content-encoding=null, headers=null, delivery-mode=null, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
the message received:Hello RabbitMQ QOS Message
在RabbitMQ的WEB控制台中就會顯示出存在的總的消息是5,等待中的消息是4,等待ack簽收的消息是1個,
具體如下展示:
我們針對消費者接收消息的代碼進行完善,具體如下:
package com.example.rabbitmq;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class MyConsumer extends DefaultConsumer
{
private Channel channel;
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
public MyConsumer(Channel channel)
{
super(channel);
this.channel=channel;
}
@Override
public void handleDelivery(
String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body) throws IOException
{
System.err.println("---------------consumer---------------\n");
System.err.println("consumerTag:"+consumerTag);
System.err.println("envelope:"+envelope);
System.err.println("properties:"+properties);
System.err.println("the message received:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
這地方設置的是非批量的方式來接收消息,我們就可以看到再次啟動消費者的程序后,就會看到每個
消息都會有一個確認ack的過程,具體如下:
四、穩定性的保障體系思考
我一直認為作為一個質量保障工程師,在穩定系的體系建設方面的前提是我們需要懂得后端涉及到的技術,那么
知道具體的技術,我們幾可以設計對應的測試用例以及測試場景,來驗證后端這部分的穩定性,比如針對限流部分,那
么我們可以模擬生產者端,批量的往消費者端寫數據,來驗證消費者端在無法消費的情況下,它的處理邏輯以及限流機
制的合理性,這中間,不管是限流還是不限流,其實都是為客戶服務的思考路線,那么限流了,是否影響客戶的正常使用?
如果影響了,這個影響點具體是什么,有哪些,質量保障團隊都需要通過測試驗證,來拿出一個評估的數據。感謝您
的閱讀,您也可關注我的公眾號: