1.簡介
為什么要對消費端進行限流?
其實很好理解,比如我們常能接觸到的消費場景:春運期間12306火車票的搶購,雙11期間的下單等。這些場景都有一個共同點就是都會導致短暫時間內請求數激增,如果我們的Consumer
最多只支持每秒1000的QPS,而由於請求的激增導致每秒2000甚至更多的並發,此時已經遠遠超過了服務本身所能處理的閾值。如果不對消息進行限流,很可能會將服務拖垮,那將會是災難性的。實際應用場景不止於這些,接下來通過RabbitMQ來講解如果對消費端做限流措施。
2. 如何限流
2.1 引入所需依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
2.1 application.yaml
concurrency:
並發量,即Consumer
本地起concurrency
個線程去消費。
prefetch:
每個線程每次預取幾條消息進行消費。
即:Consumer
每次從Broker
取concurrency * prefetch
(unack個數)條消息緩存到本地進行消費。
如果設置unack個數為20,當消費(ack)了4條消息時服務宕機了,那么剩下的16條消息會重新回到Broker中,已確認的消息會從隊列中移除掉。
spring:
rabbitmq:
host: localhost
port: 5672
# rabbit 默認的虛擬主機
virtual-host: /
# rabbit 用戶名密碼
username: admin
password: admin123
listener:
simple:
# manual 手動確認
acknowledge-mode: manual
# 消費者每次監聽消費最小數量 (並發量)
concurrency: 3
# 消費者每次監聽消費最大數量 (並發量)
max-concurrency: 10
# 消費者每次消費的數量(unack 次數:這里感覺用次數會更容易理解)
# 即:unacked 數量為 concurrency(最小並發數) * prefetch(可以不確認的次數) = 12(未被接收確認的數量)
prefetch: 4
2.2 聲明一個簡單隊列
/**
* rabbit 快速開始
*
* @author ludangxin
* @date 2021/8/23
*/
@Configuration
public class RabbitSimpleConfig {
/**
* 設置一個簡單的隊列
*/
@Bean
public Queue queue() {
return new Queue("helloMQ");
}
}
2.3 producer
/**
* 生產者
*
* @author ludangxin
* @date 2021/8/23
*/
@Component
public class SimpleProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send() {
String context = "helloMQ " + System.currentTimeMillis();
rabbitTemplate.convertAndSend("helloMQ", context);
}
}
2.4 consumer
/**
* 消費者
*
* @author ludangxin
* @date 2021/8/23
*/
@Slf4j
@Component
@RabbitListener(queues = {"helloMQ"})
public class SimpleConsumer {
@RabbitHandler
public void process(String hello) throws InterruptedException {
log.info("Message:{} ", hello);
// 設置睡眠時間,方便通過日志信息分析問題
Thread.sleep(3000);
}
}
2.5 測試代碼
@Autowired
private SimpleProducer simpleSender;
@Test
public void hello() throws Exception {
for (int i = 0; i < 100; i++) {
simpleSender.send();
}
// 阻塞進程,使消費者能夠正常監聽消費。
System.in.read();
}
我們首先分析一下:
我們在測試的時候發送了100
條消息,在項目配置的時候unack
個數設置了3 * 4 = 12,也就是說我們一次只從Broker
拉取12條信息進行消費。
當消費者進行消費的時候我們設置了3秒的延遲,還有很重要一點,我們在消費的時候沒有進行ack。也就是說,當我們消費完12條信息后,並沒有進行ack,會導致Consumer
並不會從Broker
繼續拉取消息,另一方面也能說明,Consumer
確實只從Broker
拉取了12條消息。
輸出日志如下,每三秒輸出三條信息,輸出完12條后再沒有輸出日志,證實了猜想。