redis分布式鎖扣減庫存弊端: 吞吐量低, 解決方法:使用 分段鎖 分布式分段鎖並發扣減庫存--代碼實現



package tech.codestory.zookeeper.aalvcai.ConcurrentHashMapLock;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import org.redisson.Redisson;
import org.redisson.api.RBucket;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;

import java.io.*;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;

/**
 * @version 1.0.0
 * @@menu <p>
 * @date 2021/6/10 14:33
 */
public class SegmentDistributeLock {
    /**
     *  使用redis分布式鎖扣減庫存,弊端: 請求量大的話,會導致吞吐量降低
     *  優化: 分段鎖並發扣減庫存
     *      將表中的庫存字段 分為 5個庫存字段, 然后導入redis,庫存預熱, 然后參考ConcurrentHashMap的分段鎖思想
     *      來一個請求后,對庫存字段 加 分段鎖, 分段鎖扣減庫存
     *      如果當前分段鎖庫存不夠,就扣減掉當前的庫存,然后去鎖下一個分段鎖,扣減庫存
     *
     *      git: https://gitee.com/easybao/segmentDistributeLock.git
     *      依賴jar包:
     *       <dependency>
     *             <groupId>org.redisson</groupId>
     *             <artifactId>redisson</artifactId>
     *             <version>3.13.5</version>
     *         </dependency>
     */
    RedissonClient redissonClient;
    RBucket<RedisStock[]> bucket;
    private ThreadLocal<StockRequest> threadLocal = new ThreadLocal<>();
    static volatile RedisStock[] redisStocks;
    private final int beginTotalNum; //初始總庫存,避免並發過程中 調用getCurrentTotalNum()獲取到的總庫存發生變化

    public SegmentDistributeLock()  {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://127.0.0.1:6379");
        this.redissonClient = Redisson.create(config);

        redisStocks = new RedisStock[5];
        redisStocks[0] = new RedisStock("pId_stock_00",20);
        redisStocks[1] = new RedisStock("pId_stock_01",20);
        redisStocks[2] = new RedisStock("pId_stock_02",20);
        redisStocks[3] = new RedisStock("pId_stock_03",20);
        redisStocks[4] = new RedisStock("pId_stock_04",20);
        // 初始總庫存
        this.beginTotalNum = getCurrentTotalNum();

        // 庫存預熱,存到redis中 ,  這里沒有采用因為將庫存預熱存到redis中,取出來的時候,解析異常, 不想花時間解決,所以將庫存預熱 變成一個類變量
//        bucket = redissonClient.getBucket("pId_stock");
//        bucket.set(redisStocks);

    }
    public RedissonClient getRedissonClient(){
        return this.redissonClient;
    }

    public int getCurrentTotalNum(){
        // 獲取實時總庫存
        return Stream.of(redisStocks).mapToInt(RedisStock::getNum).sum();
    }


    /**
     *  使用redis分布式鎖扣減庫存,弊端: 請求量大的話,會導致吞吐量降低
     *  優化: 分段鎖並發扣減庫存
     *      將表中的庫存字段 分為 5個庫存字段, 然后導入redis,庫存預熱, 然后參考ConcurrentHashMap的分段鎖思想
     *      來一個請求后,對庫存字段 加 分段鎖, 分段鎖扣減庫存
     *      如果當前分段鎖庫存不夠,就扣減掉當前的庫存,然后去鎖下一個分段鎖,扣減庫存
     * @param request
     * @return
     */
    public boolean handlerStock_02(StockRequest request) {
        // 先做校驗: 判斷扣減庫存 是否比 初始總庫存還大,是的話就直接false,  避免無限循環扣減不了
        if(request.getBuyNum() > this.beginTotalNum){
            return false;
        }
        // 使用本地線程變量保存請求,確保參數只在本線程使用
        threadLocal.set(request);

        // 這里使用 ThreadLocal代碼邏輯和ConcurrentHashMap的分段鎖
        RedissonClient redissonClient = getRedissonClient();
        RedisStock[] tab = redisStocks;
        int len = tab.length;
        int i = (request.getMemberId().hashCode() < 0 ? 0 : request.getMemberId().hashCode() ) % len ;

        for(RedisStock e = tab[i]; e != null; e = tab[i = nextIndex(i,len)]){

            RLock segmentLock = null;
            try {
                // 2: 對該元素加分布式分段鎖
                segmentLock = redissonClient.getLock(e.getStockName());
                segmentLock.lock();

                int buyNum = threadLocal.get().getBuyNum();
                if (buyNum <= e.getNum()) {
                    //扣減庫存
                    e.setNum(e.getNum() - buyNum);
                    // 扣減成功后,跳出循環,返回結果
                    return true;
                }else{
                    // 如果並發過程中獲取到總庫存<= 0 說明已經沒有庫存了,  如果當前需要扣減的庫存 > 此時總庫存就返回false,扣件失敗
                    if (getCurrentTotalNum() <= 0 || threadLocal.get().getBuyNum() > getCurrentTotalNum()) {
                        // 沒有庫存就false
                        System.out.println(Thread.currentThread().getName() + " 扣減庫存數: " + threadLocal.get().getBuyNum() + "失敗" + "   此時總庫存為: " + getCurrentTotalNum());
                        return false;
                    }
                    // 扣減掉當前的 分段鎖對應的庫存,然后對下一個元素加鎖
                    threadLocal.get().setBuyNum( buyNum - e.getNum());
                    e.setNum(0);
                }
            } finally {
                // 3: 解鎖
                segmentLock.unlock();
            }
        }
        threadLocal.remove();
        return false;
    }

    private static int nextIndex(int i, int len) {
        return ((i + 1 < len) ? i + 1 : 0);
    }
    public static int FNVHash(String key) {
        final int p = 16777619;
        Long hash = 2166136261L;
        for (int idx = 0, num = key.length(); idx < num; ++idx) {
            hash = (hash ^ key.charAt(idx)) * p;
        }
        hash += hash << 13;
        hash ^= hash >> 7;
        hash += hash << 3;
        hash ^= hash >> 17;
        hash += hash << 5;
        if (hash < 0) {
            hash = Math.abs(hash);
        }
        return hash.intValue();
    }

    // 顯示redis中的庫存
    public void showStocks(){
        for (RedisStock redisStock : redisStocks) {
            System.out.println(redisStock);
        }
    }

    @AllArgsConstructor
    class RedisStock implements Serializable {
        // 庫存字段
        String stockName;
        // 庫存數據, 原子類來保證原子性 num的原子性
        AtomicInteger num;

        public RedisStock(String stockName, int num) {
            this.stockName = stockName;
            this.num = new AtomicInteger(num);
        }

        public void setNum(int num) {
            this.num.set(num);
        }

        public String getStockName() {
            return stockName;
        }

        public void setStockName(String stockName) {
            this.stockName = stockName;
        }

        public int getNum() {
            return this.num.get();
        }

        @Override
        public String toString() {
            return "RedisStock{" +
                    "stockName='" + stockName + '\'' +
                    ", num=" + num.get() +
                    '}';
        }
    }
}
@Getter
@Setter
@AllArgsConstructor
class StockRequest implements Serializable{
    //會員id
    String memberId;
    //購買數量
    int buyNum;
}

class SegmentDistributeLockTest{
    public static void main(String[] args) throws IOException, ClassNotFoundException {
        // 模擬單線程扣減
        SegmentDistributeLock segmentDistributeLock = new SegmentDistributeLock();
        if(segmentDistributeLock.handlerStock_02(new StockRequest("memberId_001",54))){
            System.out.println("扣減成功");
        }else{
            System.out.println("扣減失敗");
        }
        segmentDistributeLock.showStocks();
        /**
         * 成功; 結果為:
         * RedisStock{stockName='pId_stock_00', num=0}  扣減了20個
         * RedisStock{stockName='pId_stock_01', num=10} 扣減了10個
         * RedisStock{stockName='pId_stock_02', num=20}
         * RedisStock{stockName='pId_stock_03', num=20}
         * RedisStock{stockName='pId_stock_04', num=20}
         */
    }
}

class ConcurrentTest implements Runnable{
    // 模擬10個線程並發
    private static CountDownLatch countDownLatch = new CountDownLatch(10);
    private static SegmentDistributeLock segmentDistributeLock = new SegmentDistributeLock();
    int num; //購買數量
    //會員id
    String memberId;

    public ConcurrentTest(String memberId,int num) {
        this.num = num;
        this.memberId = memberId;
    }

    public static void main(String[] args) throws InterruptedException {
        Random random = new Random();
        // 模擬並發扣減庫存(扣減1-50個)
        for (int i = 0; i < 10; i++) {
            new Thread(new ConcurrentTest("memberId_"+i,random.nextInt(50) + 1),"線程"+i).start();
            countDownLatch.countDown();
        }
        TimeUnit.SECONDS.sleep(5);
        // 並發扣減庫存結束,查詢最終庫存
        System.out.println("-----並發扣減庫存結束,查看剩余庫存-------");
        System.out.println("-----並發扣減庫存結束,查看剩余庫存-------");
        System.out.println("-----並發扣減庫存結束,查看剩余庫存-------");
        segmentDistributeLock.showStocks();
    }
    @Override
    public void run() {
        try {
            StockRequest request = new StockRequest(this.memberId, this.num);
            // 在此阻塞,等到計數器歸零之后,再同時開始 扣庫存
            System.out.println(Thread.currentThread().getName() + "已到達, 即將開始扣減庫存: "+ this.num);
            countDownLatch.await();
            if(segmentDistributeLock.handlerStock_02(request)){
                System.out.println(Thread.currentThread().getName() + " 扣減成功, 扣減庫存為: " + this.num);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

運行結果:


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM