springboot集成rabbitmq商品秒殺業務實戰(流量削峰)


消息隊列如何實現流量削峰?

要對流量進行削峰,最容易想到的解決方案就是用消息隊列來緩沖瞬時流量,把同步的直接調用轉換成異步的間接推送,中間通過一個隊列在一端承接瞬時的流量洪峰,在另一端平滑地將消息推送出去。
在這里插入圖片描述

這里就不講springbootrabbitmq如何集成了,參考文章https://www.cnblogs.com/fantongxue/p/12493497.html

一,准備工作:

數據庫有一張商品表,庫存量是100。現在有1000個消費者准備開搶這100個庫存。
t_product表維護商品編號與商品庫存剩余數量。編號No123321的這種商品的庫存量有100個。
在這里插入圖片描述
t_product_record維護搶到商品的用戶ID。理論上t_product表開搶后的 記錄數量應該是100條(共有100個人搶到了商品)。
在這里插入圖片描述
我們使用壓力測試工具jweter對其進行並發性測試。

二,springboot開始集成rabbitmq

1,加入amqp的依賴

	<dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit</artifactId>
        </dependency>

2,配置application.yml配置文件

spring:
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf-8&useSSL=true&serverTimezone=UTC
    username: root
    password: 1234

  rabbitmq:
    host: 101.201.101.206
    username: guest
    password: guest
    publisher-confirms: true  # 開啟Rabbitmq發送消息確認機制,發送消息到隊列並觸發回調方法
    publisher-returns: true
    listener:
      simple:
        concurrency: 10 #消費者數量
        max-concurrency: 10 #最大消費者數量
        prefetch: 1 #限流(消費者每次從隊列獲取的消息數量)
        auto-startup: true  #啟動時自動啟動容器
        acknowledge-mode: manual #開啟ACK手動確認模式

3,RabbitConfig配置類

1, 定義消息轉換實例 ,轉化成 JSON傳輸
2 , 配置啟用rabbitmq事務

package com.aaa.springredis.controller;

import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

@Component
public class RabbitConfig {

    /**
     * 定義消息轉換實例 ,轉化成 JSON傳輸
     *
     * @return Jackson2JsonMessageConverter
     */
    @Bean
    public MessageConverter integrationEventMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 配置啟用rabbitmq事務
     *
     * @param connectionFactory connectionFactory
     * @return RabbitTransactionManager
     */
    @Bean
    public RabbitTransactionManager rabbitTransactionManager(CachingConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }
}

4,初始化rabbitmq的回調函數

說明:被@PostConstruct修飾的方法會在服務器加載Servlet的時候運行,並且只會被服務器執行一次。如果想在生成對象時完成某些初始化操作,而偏偏這些初始化操作又依賴於依賴注入,那么久無法在構造函數中實現。為此,可以使用@PostConstruct注解一個方法來完成初始化,@PostConstruct注解的方法將會在依賴注入完成后被自動調用。

回調函數的使用前提是配置文件中開啟了rabitmq消息確認機制
在這里插入圖片描述

Constructor >> @Autowired >> @PostConstruct

      @Autowired
        RabbitTemplate rabbitTemplate;

    private static final Logger LOGGER = LoggerFactory.getLogger(RabbitController.class);


    @PostConstruct
    private void init(){
        /**
         * 消息發送到交換器Exchange后觸發回調。
         * 使用該功能需要開啟確認,spring-boot中配置如下:
         * spring.rabbitmq.publisher-confirms = true
         */
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean b, String s) {
            if (b) {
                System.out.println("消息已確認 cause:{"+s+"} - {"+correlationData+"}");
            } else {
                System.out.println("消息未確認 cause:{"+s+"} - {"+correlationData+"}");
            }
        }
    });
        /**
         * 通過實現ReturnCallback接口,
         * 如果消息從交換器發送到對應隊列失敗時觸發
         * 比如根據發送消息時指定的routingKey找不到隊列時會觸發
         * 使用該功能需要開啟確認,spring-boot中配置如下:
         * spring.rabbitmq.publisher-returns = true
         */
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                LOGGER.error("消息被退回:{}", message);
                LOGGER.error("消息使用的交換機:{}", exchange);
                LOGGER.error("消息使用的路由鍵:{}", routingKey);
                LOGGER.error("描述:{}", replyText);
            }
        });
    }

三,開始測試

1,寫搶單測試類

寫搶單測試類,我們使用jweter壓力測試工具開啟1000個線程進行測試(開啟多線程並發測試),所以為了區別每一個模擬的用戶,使用userId累加的方式進行區分。

		private int userId=0;
        //開始搶單
        @RequestMapping("/begin")
        @ResponseBody
        public void begin(){
           userId++;
           this.send(userId);
        }

而上面的send方法就是把接收到的用戶請求發送到rabbitmq消息中間件中。

		@RequestMapping("/send")
        @ResponseBody
        public String send(Integer messge){
                //第一個參數:交換機名字  第二個參數:Routing Key的值  第三個參數:傳遞的消息對象
                rabbitTemplate.convertAndSend("test.direct","test",messge);
                return "發送消息成功";
        }

2,配置rabbitmq監聽方法

rabbitmq監聽上篇文章也說過了,作用就是監聽指定隊列中收到來自交換機的消息,來一條收一條,收完為止!
通過 ACK 確認是否被正確接收,每個Message都要被確認,可以手動去 ACK 或自動 ACK,如果信息消費失敗的話會拒絕當前消息,並把消息返回原隊列。

從隊列中收到用戶的userId,然后進行購買商品模擬操作(減少一個庫存,新增一條購買記錄)

 	@Autowired
    RabbitController controller;

    /**
     * @RabbitListener 可以標注在類上面,需配合 @RabbitHandler 注解一起使用
     * @RabbitListener 標注在類上面表示當有收到消息的時候,就交給 @RabbitHandler 的方法處理,具體使用哪個方法處理,
     * 根據 MessageConverter 轉換后的參數類型
     *
     * 使用 @Payload 和 @Headers 注解可以消息中的 body 與 headers 信息
     *
     * 通過 ACK 確認是否被正確接收,每個 Message 都要被確認(acknowledged),可以手動去 ACK 或自動 ACK
     */
    @RabbitListener(queues = "test") //指定監聽的隊列名
    public void receiver(@Payload Integer userId, @Headers Channel channel, Message message) throws IOException {
        LOGGER.info("用戶{}開始搶單", userId);
        try {
            //處理消息
            controller.robbingProduct(userId);
//             確認消息已經消費成功
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            LOGGER.error("消費處理異常:{} - {}", userId, e);
//             拒絕當前消息,並把消息返回原隊列
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

購買商品的方法

 public void robbingProduct(Integer userId){
                Product product = testDao.selectProductByNo("No123321");
                if (product != null && product.getTotal() > 0) {
                    //更新庫存表,庫存量減少1。返回1說明更新成功。返回0說明庫存已經為0
                    int i = testDao.updateProduct("No123321");
                    if(i>0){
                        //插入記錄
                        testDao.insertProductRecord(new ProductRecord("No123321", userId));
                        //發送短信
                        LOGGER.info("用戶{}搶單成功", userId);
                    }else {
                        LOGGER.error("用戶{}搶單失敗", userId);
                    }
                } else {
                    LOGGER.error("用戶{}搶單失敗", userId);
                }

        }

3,jweter工具測試並發

jweter壓力測試工具如何使用百度吧,這里忽略!
控制台打印
在這里插入圖片描述
在這里插入圖片描述
而數據庫中的庫存變成了0
在這里插入圖片描述
購買記錄中存放了搶單成功的用戶id(100條記錄)
在這里插入圖片描述
當然,剩下的900個用戶都搶單失敗了!

rabbitmq隊列是先進先出的順序,先來后到,1000個請求你也得給我排隊,前100個請求搶單成功之后就注定了后900個請求是搶單失敗的!

使用RabbitMQ的最主要變化就是:以前搶單操作請求直接由我們搶單應用程序執行,現在請求被轉移到了RabbitMQ服務器中。RabbitMQ服務器把接收到的搶單請求進行排隊,最后由RabbitMQ服務器把搶單請求轉發到我們的搶單應用程序,這樣的好處就是避免我們的搶單應用程序短時間直接處理大量請求。RabbitMQ服務器主要作用是減緩搶單應用程序的並發壓力,相當於在我們的搶單程序之前加了一道請求緩沖區。

實戰結束!
工程地址:https://github.com/fantongxue666/rabbitmq-seckill


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM