運用RabbitMQ編寫秒殺邏輯


簡介

  • 閱讀本篇,需要具備RabbitMQ的知識,以及其在SpringBoot中的應用。
  • 本篇將使用RabbitMQ制作一個秒殺系統的雛形,其主要充當的作用是流量削峰。

系統架構圖

  • 秒殺邏輯分為兩部分:
    1. spike-client:用於接收購買信息,查詢redis並扣除庫存,購買成功則將用戶信息發送到RabbitMQ
    2. spike-server:用於處理交換機exchange中的用戶信息,程序將使用該信息完成扣庫及訂單生成操作。

image-20201220164450126

  • redis檢查庫存信息並確認用戶具有購買資格后,可以在redis中使用相關的用戶信息,創建一個String類型數據,待訂單創建完成后,更新該數據的值為訂單對象的json格式字符串數據即可。
    1. 客戶端在得知購買成功后,需要持續請求個人的訂單信息,該信息首先會在redis中查詢,未持久化的訂單只能獲得空值;
    2. 待系統持久化完成並寫入redis后,客戶端將請求並獲取到真正的訂單信息;
    3. 客戶端獲取信息后,進入支付階段。

spike-server服務端

  • 大部分是常規的項目代碼,會着重介紹其中較為重要的關於RabbitMQ的部分。
  • 模塊架構:

image-20201220165019391

  • spike_goods的數據庫表同goods數據庫表一致:
    • 注意,這里沒有編寫SpikeGoods.java,該類和Goods.java源碼是完全一致的。
CREATE TABLE `goods` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `description` varchar(30) NOT NULL,
  `spu` varchar(30) NOT NULL,
  `sku` varchar(30) NOT NULL,
  `balance` int(11) NOT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
  • GoodsMapper.xmlGoodsServiceImpl.java源碼:
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="cn.dylanphang.spikeserver.mapper.GoodsMapper">
    <select id="findBySku" resultType="goods" parameterType="string">
        SELECT *
        FROM goods
        WHERE sku = #{goodsSku};
    </select>

    <update id="modifyBalance">
        UPDATE goods
        SET balance = #{param2}
        WHERE sku = #{param1};
    </update>
</mapper>
package cn.dylanphang.spikeserver.service.impl;

import cn.dylanphang.spikeserver.mapper.GoodsMapper;
import cn.dylanphang.spikeserver.pojo.Goods;
import cn.dylanphang.spikeserver.service.GoodsService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

/**
 * @author dylan
 * @date 2020/12/16
 */
@Service("goodsService")
@Transactional(rollbackFor = Exception.class)
public class GoodsServiceImpl implements GoodsService {

    @Resource
    private GoodsMapper goodsMapper;

    @Override
    public Goods findBySku(String goodsSku) {
        return this.goodsMapper.findBySku(goodsSku);
    }

    @Override
    public void modifyBalance(String goodsSku, Integer finalQuantity) {
        this.goodsMapper.modifyBalance(goodsSku, finalQuantity);
    }

    @Override
    public void changeBalance(String goodsSku, Integer changeQuantity) {
        int finalQuantity = this.findBySku(goodsSku).getBalance() - changeQuantity;
        if (finalQuantity < 0) {
            throw new RuntimeException("Balance is not enough.");
        }
        this.modifyBalance(goodsSku, finalQuantity);
    }
}
  • SpikeGoodsMapper.xmlSpikeGoodsMapperImpl.java源碼:
    1. 其中扣庫的過程是先查詢,后扣減,並沒有將sql置於同一條語句中;
    2. 關於FOR UPDATE字句,該字句在使用隊列的情況下,會造成一定的資源浪費,但后續使用非隊列進行對比實驗時,需要使用到此字句保存事務的一致性;
    3. 方法modifyBalance中手動延時80ms模擬處理緩慢的情況。
<?xml version="1.0" encoding="UTF-8" ?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">

<mapper namespace="cn.dylanphang.spikeserver.mapper.SpikeGoodsMapper">
    <select id="findBySku" resultType="goods" parameterType="string">
        SELECT *
        FROM spike_goods
        WHERE sku = #{goodsSku}
        FOR UPDATE;
    </select>

    <update id="modifyBalance">
        UPDATE spike_goods
        SET balance = #{param2}
        WHERE sku = #{param1};
    </update>

    <insert id="insert" parameterType="goods">
        INSERT INTO spike_goods (description, spu, sku, balance)
        VALUES (#{description}, #{spu}, #{sku}, #{balance});
    </insert>

    <delete id="truncate" parameterType="string">
        DELETE
        FROM spike_goods
        WHERE sku = #{goodsSku};
    </delete>
</mapper>
package cn.dylanphang.spikeserver.service.impl;

import cn.dylanphang.spikeserver.mapper.SpikeGoodsMapper;
import cn.dylanphang.spikeserver.pojo.Goods;
import cn.dylanphang.spikeserver.service.GoodsService;
import cn.dylanphang.spikeserver.service.SpikeGoodsService;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;

/**
 * @author dylan
 * @date 2020/12/16
 */
@Service("spikeGoodsService")
@Transactional(rollbackFor = Exception.class)
public class SpikeGoodsServiceImpl implements SpikeGoodsService {

    @Resource
    private GoodsService goodsService;

    @Resource
    private SpikeGoodsMapper spikeGoodsMapper;

    @Override
    public Goods findBySku(String goodsSku) {
        return this.spikeGoodsMapper.findBySku(goodsSku);
    }

    @Override
    public void modifyBalance(String goodsSku, Integer finalQuantity) throws InterruptedException {
        // *.模擬扣庫緩慢的情況
        Thread.sleep(80);
        this.spikeGoodsMapper.modifyBalance(goodsSku, finalQuantity);
    }

    @Override
    public void changeBalance(String goodsSku, Integer changeQuantity) throws InterruptedException {
        int finalQuantity = this.findBySku(goodsSku).getBalance() - changeQuantity;
        if (finalQuantity < 0) {
            throw new RuntimeException("Balance is not enough.");
        }
        this.modifyBalance(goodsSku, finalQuantity);
    }

    @Override
    public void insert(Goods goods) {
        this.spikeGoodsMapper.insert(goods);
    }

    @Override
    public void spikeGoods(String goodsSku, Integer quantity) {
        // 1.庫存扣減
        this.goodsService.changeBalance(goodsSku, quantity);
        // 2.獲取商品信息
        final Goods goods = this.goodsService.findBySku(goodsSku);
        goods.setBalance(quantity);
        // 3.設置秒殺商品
        this.insert(goods);
    }

    @Override
    public void truncate(String goodsSku) {
        this.spikeGoodsMapper.truncate(goodsSku);
    }
}
  • SpikeController中提供了上架秒殺商品接口,及相關處理非隊列實驗時所需要的接口:
package cn.dylanphang.spikeserver.controller;

import cn.dylanphang.spikeserver.pojo.Goods;
import cn.dylanphang.spikeserver.service.GoodsService;
import cn.dylanphang.spikeserver.service.SpikeGoodsService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

/**
 * @author dylan
 * @date 2020/12/16
 */
@RestController
@Slf4j
public class SpikeController {

    @Resource
    private GoodsService goodsService;

    @Resource
    private SpikeGoodsService spikeGoodsService;

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @RequestMapping("/find")
    public Goods find(String goodsSku) {
        return this.goodsService.findBySku(goodsSku);
    }

    /**
     * 決定指定商品用於秒殺的數量。並將該數量的sku和quantity寫入redis用於預查詢。
     *
     * @param goodsSku sku
     * @param quantity quantity
     */
    @RequestMapping("/spike")
    public void spike(String goodsSku, Integer quantity) {
        // *.以下兩條業務代碼需要放置在同一個Service中
        try {
            this.spikeGoodsService.spikeGoods(goodsSku, quantity);
            this.redisTemplate.opsForValue().set(goodsSku, quantity);
        } catch (Exception e) {
            log.info("庫存不足");
        }
    }

    @RequestMapping("/rollback")
    public void rollback(String goodsSku, Integer quantity) {
        this.goodsService.modifyBalance(goodsSku, quantity);
        this.spikeGoodsService.truncate(goodsSku);
    }

    /**
     * 該接口提供直接購買的方式。用於測試2000並發下系統是否崩潰。
     *
     * @param identity id
     * @param goodsSku sku
     * @param quantity quantity
     */
    @RequestMapping("/directBuy")
    public String directBuy(String identity, String goodsSku, Integer quantity) {
        // *.在此方法中實際還需要創建訂單並返回該訂單的編號,在創建訂單的方法中去修改庫存,此處省略
        try {
            this.spikeGoodsService.changeBalance(goodsSku, quantity);
        } catch (Exception e) {
            log.info(identity + "購買失敗。請稍后再試。");
            return "[" + identity + "] Failure. No stock.";
        }
        log.info(identity + "購買" + quantity + "個" + goodsSku + "等待支付。訂單號為:BA[" + identity + "]3740027734074");
        return "[" + identity + "] Successful.";
    }
}
  • RabbitmqConfig.java將在系統啟動時,創建項目所需要的隊列、交換機,及完成它們之間的綁定操作:
package cn.dylanphang.spikeserver.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @author dylan
 * @date 2020/12/16
 */
@Configuration
public class RabbitmqConfig {

    private static final String QUEUE_NAME = "orderQueue";
    private static final String EXCHANGE_NAME = "orderExchange";
    private static final String ROUTING_KEY = "goods.order";

    /**
     * 該Queue是創建給spike-server中的@RabbitListener用於接收信息的。
     *
     * @return Queue
     */
    @Bean("orderQueue")
    public Queue orderQueue() {
        return new Queue(QUEUE_NAME);
    }

    /**
     * 該Exchange是創建給spike-client用於發布消息的。類型為Topic。
     *
     * @return Exchange
     */
    @Bean("orderExchange")
    public Exchange orderExchange() {
        return new TopicExchange(EXCHANGE_NAME);
    }

    /**
     * 綁定Queue與Exchange讓隊列明確需要到那個Exchange中接收消息,並指定該Queue的所接收信息必須攜帶的routingKey.
     *
     * @param orderQueue    Queue
     * @param orderExchange Exchange
     * @return Binding
     */
    @Bean
    public Binding binding(Queue orderQueue, Exchange orderExchange) {
        return BindingBuilder.bind(orderQueue).to(orderExchange).with(ROUTING_KEY).noargs();
    }
}
  • OrderListener將從隊列中有序地取出購買信息並處理:
package cn.dylanphang.spikeserver.listener;

import cn.dylanphang.spikeserver.service.SpikeGoodsService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Properties;

/**
 * @author dylan
 * @date 2020/12/16
 */
@Component
@Slf4j
public class OrderListener {
    private static final String QUEUE_NAME = "orderQueue";

    @Resource
    private SpikeGoodsService spikeGoodsService;

    @RabbitListener(queues = QUEUE_NAME)
    public void handleOrder(String message) {
        final ObjectMapper objectMapper = new ObjectMapper();
        try {
            final Properties properties = objectMapper.readValue(message, Properties.class);
            final String name = properties.getProperty("identity");
            final String sku = properties.getProperty("goodsSku");
            final Integer quantity = Integer.valueOf(properties.getProperty("quantity"));

            // *.在此方法中實際還需要創建訂單並返回該訂單的編號,在創建訂單的方法中去修改庫存,此處省略
            try {
                this.spikeGoodsService.changeBalance(sku, quantity);
            } catch (Exception e) {
                log.info(name + "購買失敗。請稍后再試。");
                return;
            }
            log.info(name + "購買" + quantity + "個" + sku + "等待支付。訂單號為:BA[" + name + "]3740027734074");
        } catch (JsonProcessingException e) {
            log.error(e.getMessage());
            throw new RuntimeException(e);
        }
    }
}
  • application.yml中的配置如下:
    1. 使用了druid數據庫連接池;
    2. spike-server服務端的啟動端口為9090
spring:
  datasource:
    druid:
      db-type: com.alibaba.druid.pool.DruidDataSource
      driver-class-name: com.mysql.cj.jdbc.Driver
      url: jdbc:mysql://localhost:3306/mall?serverTimezone=GMT%2B8&useAffectedRows=true
      username: root
      password: root
      # 初始連接數
      initial-size: 5
      # 最小連接數
      min-idle: 10
      # 最大連接數
      max-active: 20
      # 獲取連接超時時間
      max-wait: 5000
      # 連接有效性檢測時間
      time-between-eviction-runs-millis: 60000
      # 連接在池中最小生存的時間
      min-evictable-idle-time-millis: 300000
      # 連接在池中最大生存的時間
      max-evictable-idle-time-millis: 900000
      test-while-idle: true
      test-on-borrow: false
      test-on-return: false
      # 檢測連接是否有效
      validation-query: select 1
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: dylan
    password: 123456
    virtual-host: /spike-system
  redis:
    host: 192.168.88.210
    port: 6379
mybatis:
  mapper-locations: classpath:mapper/*xml
  type-aliases-package: cn.dylanphang.spikeserver.pojo
server:
  port: 9090

spike-client客戶端

  • 此模塊用於判定用戶是否購買成功,並將相關購買成功的用戶信息,發送到消息隊列中。
  • 模塊架構:

image-20201220170707431

  • RedisConfig.java源碼如下,用於自定義RedisTemplate<String, Object>對象:
    • 此配置類可以省略,實驗存入redis的數據僅僅是商品庫存信息,但多數情況下,項目都會構建此類,用於存儲對象;
    • 需要知道SpringBoot默認不提供RedisTemplate<String, Object>對象;
    • SpringBoot僅提供自動配置的RedisTemplate<Object, Object>RedisTemplate<String, String>對象。
package cn.dylanphang.spikeclient.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * 本類用於自定義RedisTemplate,如果需要用於直接存儲pojo類,那么該類需要進行序列化。
 * 數據需要在網路上進行傳輸,一般都需要進行序列化操作。
 * 其中主要目的是讓value對象可以使用ObjectMapper進行轉換后再序列化,重點是Jackson2JsonRedisSerializer<Object>與ObjectMapper。
 *
 * @author dylan
 * @date 2020/12/16
 */
@Configuration
public class RedisConfig {

    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
        // 0.創建RedisTemplate對象並設置連接方式,默認是lettuce
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory);

        // 1.字符串序列化和對象序列化
        final StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        final Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<>(Object.class);

        // 2.將對象序列化為Json字符串格式的數據,需要為序列化實例設置一個ObjectMapper對象
        // *.如果不對ObjectMapper進行任何配置,那么從redis中取出來的對象會被封裝在一個LinkedHashMap中
        jackson2JsonRedisSerializer.setObjectMapper(new ObjectMapper());

        // 3.key采用String的方式序列化,value采用Jackson的方式序列化
        template.setKeySerializer(stringRedisSerializer);
        template.setValueSerializer(jackson2JsonRedisSerializer);
        template.setHashKeySerializer(stringRedisSerializer);
        template.setHashValueSerializer(jackson2JsonRedisSerializer);

        // 4.初始化RedisTemplate對象
        template.afterPropertiesSet();

        return template;
    }
}
  • RedisServiceImpl.java源碼如下,使用redisTemplate對象操作redis中的數據:
    • 對於redis來說單條的語句可以保證事務的原子性的。
package cn.dylanphang.spikeclient.service.impl;

import cn.dylanphang.spikeclient.service.RedisService;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;

/**
 * @author dylan
 * @date 2020/12/16
 */
@Service("redisService")
public class RedisServiceImpl implements RedisService {

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @Override
    public void modify(String goodsSku, Integer quantity) {
        // *.秒殺自設置的一刻開始就會創建一個redis的String類型數據用於存儲秒殺商品的庫存信息,從redis中扣減數量
        final Long decrement = this.redisTemplate.opsForValue().decrement(goodsSku, quantity);
        if (decrement != null && decrement < 0) {
            throw new RuntimeException("No any stock.");
        }
    }
}
  • SpikeController.java源碼如下:
    1. 使用redisService檢查redis中相關商品是否有庫存;
    2. 使用objectMapper對象將數據包裝為json格式的字符串;
    3. 使用rabbitTemplate中提供的 converAndSend方法,將包裝后的數據發送到交換機exchange中。
package cn.dylanphang.spikeclient.controller;

import cn.dylanphang.spikeclient.service.RedisService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.HashMap;

/**
 * spike-client將需要處理的數據使用ObjectMapper進行處理,得到Json格式字符串,並發送到Exchange: orderExchange中。
 * spike-server中的@RabbitListener會通過orderQueue持續監聽orderExchange中是否有消息,如果有則會被orderQueue所接收到。
 * routingKey是用於識別篩選orderQueue的標志,orderExchange采用的是Topic類型,那么routingKey的設定會更加靈活。
 *
 * @author dylan
 * @date 2020/12/16
 */
@RestController
@Slf4j
public class SpikeController {

    private static final String EXCHANGE_NAME = "orderExchange";
    private static final String ROUTING_KEY = "goods.order";

    @Resource
    private RedisService redisService;

    @Resource
    private RabbitTemplate rabbitTemplate;

    /**
     * Listener中采用線程休眠80ms模擬處理緩慢的情況,此時使用redis存儲庫存信息加以控制,因無庫存而購買失敗的用戶會獲得即時反饋。
     *
     * @param identity id
     * @param goodsSku sku
     * @param quantity quantity
     * @return string
     * @throws JsonProcessingException exception
     */
    @RequestMapping("/buy")
    public String buy(String identity, String goodsSku, Integer quantity) throws JsonProcessingException {
        // 1.修改redis中的庫存信息,其中的庫存信息在秒殺確認的時候被寫入了redis中,如果拋出異常,則搶購失敗
        try {
            this.redisService.modify(goodsSku, quantity);
        } catch (Exception e) {
            return "[" + identity + "] Failure. No stock.";
        }

        // 2.將參數轉為Json格式的字符串,實際中形參可能是一個pojo類型,那么此時可以直接使用ObjectMapper轉換為Json格式字符串
        final HashMap<String, Object> hashMap = new HashMap<>(3);
        hashMap.put("identity", identity);
        hashMap.put("goodsSku", goodsSku);
        hashMap.put("quantity", quantity);

        final ObjectMapper objectMapper = new ObjectMapper();
        final String message = objectMapper.writeValueAsString(hashMap);

        // 3.發送到消息隊列中
        this.rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY, message);

        // 4.當前端收到本條信息后,需要持續請求另一個controller以獲取已創建好的訂單編號等信息,用於支付業務
        return "[" + identity + "] Successful.";
    }

    @RequestMapping("/getOrder")
    public void getOrder(String identity) {
        // *.考慮到成功搶購的人才會請求此接口,那么可以直接查詢數據庫,不需要建立新的隊列了
        log.info("根據用戶信息查詢訂單信息,返回給前端用於支付業務");
    }
}

項目測試

  • 測試類寫在了spike-client中,其中分為兩部分測試:

    1. 不使用RabbitMQ,直接調用spike-server所提供的/directBuy接口;
    2. 使用RabbitMQ,將調用spike-client中所提供的/buy接口。
  • 其中線程池使用了Google提供的guava包,線程池工具類ThreadUtils.java源碼如下:

package cn.dylanphang.spikeclient.util;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

import java.util.concurrent.*;

/**
 * @author dylan
 */
public class ThreadUtils {
    public static void create(Runnable runnable) {

        ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
                .setNameFormat("demo-pool-%d").build();
        ExecutorService singleThreadPool = new ThreadPoolExecutor(2000, 4000,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());

        singleThreadPool.execute(runnable);
    }
}
  • 所使用的數據庫表有兩個,一個為goods,另一個為spike_goods。測試類會首先模擬查詢表goods,獲取目標商品的庫存,並通過訪問spike-server中的/spike接口,設置本商品的秒殺數量,該數據同時會寫入redisspike_goods
  • 關於測試的相關說明:
    • 事實上,所有的測試都應該是自動化進行的,但本實驗中的部分測試非自動化,僅是為了能更好地理解;
    • 因此實際應用中,請以更為規范的方式去編寫測試類。

1. 不使用RabbitMQ

  • 測試類SpikeClientNoRabbitMqTest.java源碼如下:
    1. 每次進行測試前都會重置數據,保證商品實際庫存為2000,用於秒殺的數量為1500
    2. 本次實驗的並發數量為10000條請求,為了便於觀察,其中編寫了計數代碼,在實驗結束后程序會輸出成功的次數。
package cn.dylanphang.spikeclient;

import cn.dylanphang.spikeclient.util.ThreadUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.HttpMethod;
import org.springframework.web.client.RestTemplate;

import javax.annotation.Resource;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

/**
 * @author dylan
 * @date 2020/12/17
 */
@Slf4j
@SpringBootTest
public class SpikeClientNoRabbitMqTest {

    public static final String FIND = "http://localhost:9090/find?goodsSku=7742994";
    public static final String SPIKE = "http://localhost:9090/spike?goodsSku=7742994&quantity=1500";
    public static final String ROLLBACK = "http://localhost:9090/rollback?goodsSku=7742994&quantity=2000";
    public static final String GOODS_SKU = "7742994";

    public static final int CONCURRENT_TIME = 10000;

    /**
     * RestTemplate中封裝了httpclient和urlconnection。
     */
    private final RestTemplate restTemplate = new RestTemplate();
    private final CountDownLatch countDownLatch = new CountDownLatch(CONCURRENT_TIME);
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @BeforeEach
    void init() throws JsonProcessingException {
        // 1.確認商品總數量
        String body = this.restTemplate.getForEntity(FIND, String.class).getBody();
        Properties properties = this.objectMapper.readValue(body, Properties.class);
        log.info("商品總數量為:{}", properties.getProperty("balance"));

        // 2.划分部分或全部用於秒殺
        this.restTemplate.execute(SPIKE, HttpMethod.GET, null, null);
        log.info("用於秒殺數量:{}", "1500");

        // 3.確認剩余數量
        body = this.restTemplate.getForEntity(FIND, String.class).getBody();
        properties = this.objectMapper.readValue(body, Properties.class);
        log.info("扣減后剩余為:{}", properties.getProperty("balance"));

        // 4.確認redis數據是否正常
        final Object exist = this.redisTemplate.opsForValue().get(GOODS_SKU);
        log.info("redis中存在數量為:{}", null == exist ? "NaN" : (int) exist);
    }

    @Test
    void destroy() {
        // 1.恢復商品總數量與秒殺商品表
        this.restTemplate.execute(ROLLBACK, HttpMethod.GET, null, null);
        // 2.恢復redis
        this.redisTemplate.delete(GOODS_SKU);
    }

    /**
     * 在spike-server的SpikeGoodsServiceImpl類中的modifyBalance里,進行了扣庫緩慢的模擬操作。
     * 此時spike-client中發出的請求受到了扣庫緩慢的影響,部分用戶因為等待超時而購買失敗,部分用戶就算購買成功,等待的時間也過長。
     *
     * @throws InterruptedException 異常
     */
    @Test
    void test() throws InterruptedException {
        int[] finalCounter = new int[]{0};
        for (int i = 0; i < CONCURRENT_TIME; i++) {
            final int counter = i;
            ThreadUtils.create(() -> {
                try {
                    // *.模擬10000台主機同時直接發出請求
                    final String url = this.urlBuild(counter);
                    final RestTemplate restTemplate = new RestTemplate();
                    countDownLatch.await();
                    final String str = restTemplate.getForEntity(url, String.class).getBody();
                    log.info(str);
                    // *.計算成功次數
                    if (str != null && str.contains("Successful")) {
                        finalCounter[0]++;
                    }
                } catch (Exception e) {
                    log.error(e.getMessage());
                }
            });
            this.countDownLatch.countDown();
        }
        // *.防止此方法結束,導致等待中的線程一同結束,需要休眠大概120秒(可以更短,根據性能而定)
        Thread.sleep(120000);
        // *.輸出成功次數
        log.info("Successful times: {}", finalCounter[0]);
    }

    /**
     * 根據傳入的數字拼接字符串。
     *
     * @param counter 數字
     * @return url
     */
    private String urlBuild(int counter) {
        String identity = "";

        if (counter < 10) {
            identity = "000" + counter;
        } else if (counter < 100) {
            identity = "00" + counter;
        } else if (counter < 1000) {
            identity = "0" + counter;
        } else {
            identity = "" + counter;
        }
        return "http://localhost:9090/directBuy?goodsSku=7742994&quantity=1&identity=" + identity;
    }
}
  • 運行測試類,得到如下結果,其中僅有864條購買請求成功寫入MySQL中:

image-20201220174303876

  • 對比數據庫中spike_goods的數量:

image-20201220174925763

  • 顯然用於秒殺的商品數量是符合事務的一致性,總數仍然為864 + 636 = 1500件。
  • 但此時的購買失敗率卻驚人地高,觀察控制台中的其他輸出:
    1. 其中有直接因為服務器當前請求量過大,而直接被拒絕連接的請求所輸出的錯誤日志,這個問題是很嚴重的,因為你當前系統一定不止這一個接口在提供服務,此時如果出現連接被拒絕,那么對於其他在用接口來說也會出現連接被拒絕的情況;
    2. 還有輸出為Failure. No stock的,注意這里並不是因為沒有庫存,而是因為等待數據庫連接對象超時導致的失敗,我們的代碼直接將等待超時拋出的異常歸類為“失敗,無庫存。”,實際中需要進一步對異常進行細分處理。

image-20201220174638639

  • 提示:還記得之前在SELECT字句中使用的FOR UPDATE嗎?如果此時不在SELECT中添加該字句,會導致事務失去一致性。
  • 此時,不同的請求可能查詢到同樣的庫存結果,顯然這是不合理的。FOR UPDATE字句可以保證查詢的數據需要用於更新,其保證了事務的一致性,但卻消耗了不少的系統資源。

2. 使用RabbitMQ

  • 實際項目中,我們需要解決的問題是:
    1. 首先,我們並不希望系統在一瞬間接受過多的請求,這可能會導致系統當前的其他接口的不可用性;
    2. 其次,即使在系統可以承受的請求范圍內,我們的數據庫MySQL也不應該在同一時間處理過多的業務,數據庫連接池的最大連接數量是有限的,如果秒殺系統已經將所有的連接對象占用,也會導致其他需要使用連接對象的業務癱瘓;
    3. 最后,是事務的一致性問題,在直接請求系統接口進行購買的前提下,就必須要保證線程之間事務的一致性。
  • 線程之間的事務是相互獨立的,一個線程中的事務失敗並不會導致另一個線程中的事務失敗,如何保證線程事物的一致性呢?
    1. 在查詢語句上使用FOR UPDATE來進行鎖表的操作,表明查詢的數據是用於更新的;
    2. 將操作寫在同一個sql語句中,但這會造成一定的資源浪費,可能需要在dao層中添加額外的方法。
  • 使用RabbitMQ可以解決以上所有的問題,spike-client配合redis中寫入的庫存信息,可以達到即時反饋用戶是否購買成功的目的,同時通過RabbitMQ將消息發送到指定的交換機中,spike-server只需要從交換機中獲取購買信息創建訂單即可。
  • 測試類SpikeClientApplicationTest如下:
    1. 並發請求數量仍然為10000條;
    2. 其中所有的線程都會請求spike-client中的/buy接口,以請求購買,只有庫存尚存的情況下,請求才會被放行;
    3. 所有被放行的請求,數據都將被裝換為json格式的字符串,並發送到指定的交換機exchange中;
    4. spike-server中的OrderListener則持續監聽orderQueue中來自指定交換機exchange中獲取的消息:
      1. 消息是逐條處理的;
      2. 消息只有完成持久化后,才會進行下一條消息的處理。
package cn.dylanphang.spikeclient;

import cn.dylanphang.spikeclient.util.ThreadUtils;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.http.HttpMethod;
import org.springframework.test.web.servlet.MockMvc;
import org.springframework.test.web.servlet.request.MockHttpServletRequestBuilder;
import org.springframework.test.web.servlet.request.MockMvcRequestBuilders;
import org.springframework.test.web.servlet.setup.MockMvcBuilders;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.context.WebApplicationContext;

import javax.annotation.Resource;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

@Slf4j
@SpringBootTest
class SpikeClientApplicationTest {

    public static final String FIND = "http://localhost:9090/find?goodsSku=7742994";
    public static final String SPIKE = "http://localhost:9090/spike?goodsSku=7742994&quantity=1500";
    public static final String ROLLBACK = "http://localhost:9090/rollback?goodsSku=7742994&quantity=2000";
    public static final String GOODS_SKU = "7742994";

    public static final int CONCURRENT_TIME = 10000;

    /**
     * RestTemplate中封裝了httpclient和urlconnection。
     */
    private final RestTemplate restTemplate = new RestTemplate();
    private final CountDownLatch countDownLatch = new CountDownLatch(CONCURRENT_TIME);
    private final ObjectMapper objectMapper = new ObjectMapper();

    @Resource
    private WebApplicationContext wac;

    @Resource
    private RedisTemplate<String, Object> redisTemplate;

    @BeforeEach
    void init() throws JsonProcessingException {
        // 1.確認商品總數量
        String body = this.restTemplate.getForEntity(FIND, String.class).getBody();
        Properties properties = this.objectMapper.readValue(body, Properties.class);
        log.info("商品總數量為:{}", properties.getProperty("balance"));

        // 2.划分部分或全部用於秒殺
        this.restTemplate.execute(SPIKE, HttpMethod.GET, null, null);
        log.info("用於秒殺數量:{}", "1500");

        // 3.確認剩余數量
        body = this.restTemplate.getForEntity(FIND, String.class).getBody();
        properties = this.objectMapper.readValue(body, Properties.class);
        log.info("扣減后剩余為:{}", properties.getProperty("balance"));

        // 4.確認redis數據是否正常
        final Object exist = this.redisTemplate.opsForValue().get(GOODS_SKU);
        log.info("redis中存在數量為:{}", null == exist ? "NaN" : (int) exist);
    }

    @Test
    void destroy() {
        // 1.恢復商品總數量與秒殺商品表
        this.restTemplate.execute(ROLLBACK, HttpMethod.GET, null, null);
        // 2.恢復redis
        this.redisTemplate.delete(GOODS_SKU);
    }

    /**
     * 模擬高並發情況下,使用RabbitMQ削峰的過程。
     *
     * @throws InterruptedException 異常
     */
    @Test
    void contextLoads() throws InterruptedException {
        for (int i = 0; i < CONCURRENT_TIME; i++) {
            final int counter = i;
            ThreadUtils.create(() -> {
                try {
                    // *.測試並發的時候需要將MockMvc置入線程內,模擬10000台主機同時發出請求,不能寫到線程之外
                    final MockMvc mockMvc = MockMvcBuilders.webAppContextSetup(this.wac).build();
                    final String url = this.urlBuild(counter);
                    final MockHttpServletRequestBuilder request = MockMvcRequestBuilders.get(url);

                    countDownLatch.await();
                    final String str = mockMvc.perform(request).andReturn().getResponse().getContentAsString();
                    log.info(str);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            this.countDownLatch.countDown();
        }
        // *.防止此方法結束,導致等待中的線程一同結束,需要休眠大概40秒(可以更短,根據性能而定)
        Thread.sleep(40000);
    }

    /**
     * 根據傳入的數字拼接字符串。
     * @param counter 數字
     * @return url
     */
    private String urlBuild(int counter) {
        String identity = "";

        if (counter < 10) {
            identity = "000" + counter;
        } else if (counter < 100) {
            identity = "00" + counter;
        } else if (counter < 1000) {
            identity = "0" + counter;
        } else {
            identity = "" + counter;
        }
        return "http://localhost:8080/buy?goodsSku=7742994&quantity=1&identity=" + identity;
    }
}
  • 運行測試類,可以觀察到測試類中的所有線程,在測試線程休眠的40秒中,spike-client就已經完成了所有的操作,並立即響應給用戶是否成功的結果,而在此期間也沒有出現任何的異常。

image-20201220181711464

  • 此時的spike-server服務端正在有序地從隊列中獲取購買信息,並逐條進行持久化操作:

image-20201220181902641

  • spike-server處理完畢后,數據庫中的秒殺商品數量清空,沒有出現超賣的現象:

image-20201220182016686

  • 此時RabbitMQ中的消息也一並被處理完畢:

image-20201220182024857

  • 在使用RabbitMQ后,所有的並發請求由始至終只會占用了一個數據庫連接對象(可能不是同一個);
  • 同時也不再需要添加FOR UPDATE字句,所有的任務都將有序地進行,同時不會影響到系統其它部分的正常運作。

總結

  • 使用RabbitMQ能有效地達到流量削峰的目的,減輕系統的負擔。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM