RabbitMQ-限流


1.簡介

為什么要對消費端進行限流?

其實很好理解,比如我們常能接觸到的消費場景:春運期間12306火車票的搶購,雙11期間的下單等。這些場景都有一個共同點就是都會導致短暫時間內請求數激增,如果我們的Consumer最多只支持每秒1000的QPS,而由於請求的激增導致每秒2000甚至更多的並發,此時已經遠遠超過了服務本身所能處理的閾值。如果不對消息進行限流,很可能會將服務拖垮,那將會是災難性的。實際應用場景不止於這些,接下來通過RabbitMQ來講解如果對消費端做限流措施。

2. 如何限流

2.1 引入所需依賴

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit-test</artifactId>
    <scope>test</scope>
</dependency>

2.1 application.yaml

concurrency:並發量,即Consumer本地起concurrency個線程去消費。

prefetch:每個線程每次預取幾條消息進行消費。

即:Consumer每次從Brokerconcurrency * prefetch(unack個數)條消息緩存到本地進行消費。

如果設置unack個數為20,當消費(ack)了4條消息時服務宕機了,那么剩下的16條消息會重新回到Broker中,已確認的消息會從隊列中移除掉。

spring:
  rabbitmq:
    host: localhost
    port: 5672
    # rabbit 默認的虛擬主機
    virtual-host: /
    # rabbit 用戶名密碼
    username: admin
    password: admin123
    listener:
      simple:
        # manual 手動確認
        acknowledge-mode: manual
        # 消費者每次監聽消費最小數量 (並發量)
        concurrency: 3
        # 消費者每次監聽消費最大數量 (並發量)
        max-concurrency: 10
        # 消費者每次消費的數量(unack 次數:這里感覺用次數會更容易理解)
        # 即:unacked 數量為 concurrency(最小並發數) * prefetch(可以不確認的次數) = 12(未被接收確認的數量)
        prefetch: 4

2.2 聲明一個簡單隊列

/**
 * rabbit 快速開始
 *
 * @author ludangxin
 * @date 2021/8/23
 */
@Configuration
public class RabbitSimpleConfig {

    /**
     * 設置一個簡單的隊列
     */
    @Bean
    public Queue queue() {
        return new Queue("helloMQ");
    }
}

2.3 producer

/**
 * 生產者
 *
 * @author ludangxin
 * @date 2021/8/23
 */
@Component
public class SimpleProducer {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send() {
        String context = "helloMQ " + System.currentTimeMillis();
        rabbitTemplate.convertAndSend("helloMQ", context);
    }
}

2.4 consumer

/**
 * 消費者
 *
 * @author ludangxin
 * @date 2021/8/23
 */
@Slf4j
@Component
@RabbitListener(queues = {"helloMQ"})
public class SimpleConsumer {

    @RabbitHandler
    public void process(String hello) throws InterruptedException {
        log.info("Message:{} ", hello);
        // 設置睡眠時間,方便通過日志信息分析問題
        Thread.sleep(3000);
    }

}

2.5 測試代碼

@Autowired
private SimpleProducer simpleSender;

@Test
public void hello() throws Exception {
    for (int i = 0; i < 100; i++) {
        simpleSender.send();
    }
    // 阻塞進程,使消費者能夠正常監聽消費。
    System.in.read();
}

我們首先分析一下:

我們在測試的時候發送了100條消息,在項目配置的時候unack個數設置了3 * 4 = 12,也就是說我們一次只從Broker拉取12條信息進行消費。

當消費者進行消費的時候我們設置了3秒的延遲,還有很重要一點,我們在消費的時候沒有進行ack。也就是說,當我們消費完12條信息后,並沒有進行ack,會導致Consumer並不會從Broker繼續拉取消息,另一方面也能說明,Consumer確實只從Broker拉取了12條消息。

輸出日志如下,每三秒輸出三條信息,輸出完12條后再沒有輸出日志,證實了猜想。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM