實現防止超賣的幾種方式


Preface:

  所有場景都不是生產環境,作為Demo僅供參考,測試工具為Jmeter.用nginx做分發

upstream lock{
     #//不同於網上一般的解釋,我測試的結果為: 未建立連接次數達到max_fails次后,該服務器會被屏蔽fail_timeout時間,這期間該服務器將不再接收請求
            server 127.0.0.1:8082 weight=1 max_fails=3  fail_timeout=10s;  
            server 127.0.0.1:8081 weight=1 max_fails=3  fail_timeout=10s;
    }
    server {
        listen       80;
        server_name  localhost;
        location / {
            proxy_pass http://lock;
            proxy_connect_timeout 1;    //這里是nginx與服務器的連接等候時間,超過該時間未建立連接則放棄,轉發請求到其他服務器
        }
    }

Test:

  case one:   數據庫version字段 樂觀鎖

//單體應用 分布式都可以用 問題就是所有請求都懟到數據庫了 數據庫壓力大
@GetMapping("/pay")
public String testPayByCAS(){
    for (int i = 0; i < 5; i++) {
        Goods goods = goodsMapper.selectByPrimaryKey(1);
        int storage = goods.getGoodsStorage();
        int version = goods.getVersion();
        goods.setGoodsStorage(storage-1);
        goods.setVersion(version+1);
        if(goods.getGoodsStorage()>=0){
            GoodsExample goodsExample = new GoodsExample();
            goodsExample.createCriteria().andVersionEqualTo(version).andIdEqualTo(goods.getId());
            int res = goodsMapper.updateByExample(goods,goodsExample);
            if(res==0){
                continue;
            }else{
                log.info("本次購買商品一件,剩余庫存{}件",goods.getGoodsStorage());
                return "本次購買商品一件,剩余庫存"+goods.getGoodsStorage()+"件";
            }
        }else{
            log.info("庫存不足");
            return "庫存不足";
        }
    }
    return "網絡延遲,請稍后嘗試";
}

  case:two  redis setnx ex  

//利用redis 做分布式鎖,此處實現需要根據實際場景進行優化. 好處是能分擔數據庫壓力,但加鎖的時間無法確定,需要另啟線程進行延時處理
//有個問題是 如果子線程延期6次后 主線程還未運行完畢 后續又會引發很多問題,這里可以 結合case1的樂觀鎖一起使用,用version字段雙重保險,或許本來就應該這么做
static ThreadPoolExecutor pool = new ThreadPoolExecutor(10,
            20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));
    //這里最高並發限制50,我這里只有拿到鎖了 才開啟線程,早已有余,商品種類多,可以考慮加大隊列/最大線程數
    static {
        pool.prestartAllCoreThreads(); //預先創建核心線程
    }
    @GetMapping("/payII")
    public String testPayLockByRedis() throws InterruptedException {
        //這里最好還是用userId
        String name = Thread.currentThread().getName();
        for (int i = 0; i < 5; i++) {
            Boolean lock = redisTemplate.opsForValue().setIfAbsent("1", name, 10000, TimeUnit.MILLISECONDS);
            if (lock) {
                MyRunnable m = new MyRunnable(name);
                try {
                    pool.execute(m);
                    Goods goods = goodsMapper.selectByPrimaryKey(1);
                    int storage = goods.getGoodsStorage();
                    goods.setGoodsStorage(storage - 1);
                    if (goods.getGoodsStorage() >= 0) {
                        goodsMapper.updateByPrimaryKey(goods);
                        log.info("本次購買商品一件,剩余庫存{}件", goods.getGoodsStorage());
                        return "本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件";
                    } else {
                        log.info("庫存不足");
                        return "庫存不足";
                    }
                } catch (Exception e) {
            //這里拋出異常讓事務回滾 , 異常部分讓切面處理,優先級和事務一致,優先級一致事務先執行
                    throw new RunTimeException(e);
                } finally {
                    m.flag = false;
            //這里的處理是因為 子線程在延時6次后,沒有中斷主線程的運行(這里無法中斷,線程之間的運行是獨立的,子線程拋出異常無法被主線程捕獲,至多讓thread設置一個
//UncaughtExceptionHandler,在子線程拋出異常后,子線程內部自己進行捕獲處理邏輯,然而還是不能影響主線程),既然如此,存在30s過后主線程仍未執行完畢的可能性,此時鎖已易主,如果未
//和version字段一起做保險處理,建議拋出異常,回滾事務
                     Object o = redisTemplate.opsForValue().get("1")
              if (o!=null && name.equals(o.toString())) {
                 redisTemplate.delete("1");
                    }else{throw new RuntimeException("鎖已失效")}
                }
            } else {
                continue;
            }
        }
        return "網絡延遲,請稍后嘗試";
    }

    private class MyRunnable implements Runnable {
        boolean flag = true;
        int time;  //延期次數
        String name;  //線程name   用userId好些
        private MyRunnable(String name) {this.name = name;}

        @Override
        public void run() {
            try {
                //給予一定的處理時間 再給任務做延時處理
                Thread.sleep(2000);
                while (flag && time++ < 6) {
                    Object o = redisTemplate.opsForValue().get("1");
                    //鎖存在 且是自身加的鎖  給鎖延期
                    if (o != null && o.toString().equals(name) && flag) {
                        //如果出現判定通過
                           /* case1:  外面修改flag  這里延期成功  外面刪除  並不影響
                              case2:  外面修改flag  刪除  還未往redis 重新set   這里就延期  也不影響
                              case3:  外面修改flag  刪除  其他用戶往redis 重新set   這里再延期 好像也不會發生什么
                            */
                        redisTemplate.expire("1", 10000, TimeUnit.MILLISECONDS);
                        System.out.println("延時一次");
                    }
                    Thread.sleep(3000);
                }
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

  case 3: redisson

//redisson 已經把上面我們想的到的 以及想不到的 都做了,我們只管用就好了,另外redisson還可以使用multilock,將一組鎖當做一個鎖來加鎖和釋放,保證原子性
//如我影院Demo中用的就是:  其實redisson這塊認知還很淺,包括redis也是,有時間還是需要鑽研鑽研
    RLock[] locks = new RLock[list.size()];
    for (int i = 0; i < list.size(); i++) {
      RLock lock = redisson.getLock(String.valueOf(list.get(i).getFtpId()));
      ftps.add(list.get(i).getFtpId());
      locks[i] = lock;
  }
  RLock multiLock = redisson.getMultiLock(locks);
    multiLock.lock();
  .....
  multiLock.unlock();

  @GetMapping("/payIII")
    public String payLockByRedisson() {
        RLock lockI = redisson.getLock("1");
        try {
            lockI.lock(15, TimeUnit.SECONDS);
            Goods goods = goodsMapper.selectByPrimaryKey(1);
            int storage = goods.getGoodsStorage();
            goods.setGoodsStorage(storage - 1);
            if (goods.getGoodsStorage() >= 0) {
                goodsMapper.updateByPrimaryKey(goods);
                System.out.println("本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件");
                return "本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件";
            } else {
                System.out.println("庫存不足");
                return "庫存不足";
            }
        } finally {
            lockI.unlock();
        }
    }            

  case four:  Aqs   

//下面看下 用抽象隊列同步器做的鎖 ,ReentrantLock的底層,當然我不是看了底層自己敲出來的,暫時還沒時間去研究那些. 這是以前網上學的,這里分享下
  @GetMapping("/payIv")
    public String payLockByAqs() {
        aqsLock.lock();
        Goods goods = goodsMapper.selectByPrimaryKey(1);
        int storage = goods.getGoodsStorage();
        goods.setGoodsStorage(storage - 1);
        if (goods.getGoodsStorage() >= 0) {
            goodsMapper.updateByPrimaryKey(goods);
            System.out.println("本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件");
            aqsLock.unlock();
            return "本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件";
        } else {
            System.out.println("庫存不足");
            aqsLock.unlock();
            return "庫存不足";
        }
    }

  AqsLock   

//模擬下流程:a,b,c進入方法,a獲取到鎖,b,c也嘗試獲取,cas操作失敗,按順序加入隊列,a解鎖,喚醒隊列第一個線程b,b醒來獲取鎖,自身為隊首,移除,執行操作,解鎖喚醒c重復操作
@Component
public class AqsLock {
    private volatile int state = 0;   //保證線程之間的通信
    private static final Unsafe UNSAFE = UnsafeInstance.getUnsafeInstance();
    private ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();  //這是一個無界隊列

    public void lock(){
        Thread current = Thread.currentThread();
        //如果獲取鎖 就無事發生,沒獲取到鎖將進入阻塞狀態
        if(acquire(current)){
            return;
        }
        waiters.add(current);
        for(;;){
            //peek 獲取頭元素 並嘗試獲取鎖 ,能進行這層判斷,肯定在某處被喚醒,類似於 notify
            if(acquire(current)){
                return;
            }
            LockSupport.park(current);  //LockSupport 內部封裝的Unsafe類
        }
    }

    public void unlock(){
        //將標志位歸原 其他線程才有獲得鎖的可能
        compareAndSwapState(state,0);
        Thread thread = waiters.peek();
        if(thread!=null){
            //這里可以理解為 notify,或者說 condition.signal()后 unlock()
            LockSupport.unpark(thread);
        }
    }
    private static long stateOffset;
    static{
        try {
            //獲取偏移量
            stateOffset = UNSAFE.objectFieldOffset(AqsLock.class.getDeclaredField("state"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        }
    }
    public boolean acquire(Thread thread){
        if(state==0){
            if(compareAndSwapState(0,1)){
                // 無競爭情況下 自身是不會進入隊列的 所以多一層判斷    poll為獲取元素並且從隊列中移除,如果隊列為空返回null
                if(thread == waiters.peek()){
                    waiters.poll();
                }
                return true;
            }
        }
        return false;
    }
    public final boolean compareAndSwapState(int expect,int update){
        return UNSAFE.compareAndSwapInt(this,stateOffset,expect,update);
    }
}

  Unsafe

  //構造私有 只能反射獲取
public
class UnsafeInstance { public static Unsafe getUnsafeInstance() { Field theUnsafe = null; try { theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); return (Unsafe)theUnsafe.get(null); } catch (NoSuchFieldException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } return null; } }

  case five:  synchronized

  //閑來無事 順手測下synchronized 效率竟然比case one高, 當然 這是錯誤的示范,synchronized只能加在方法內,那么多件商品,不可能共享一把鎖
  //synchronized鎖對象 一定要保證是一個唯一對象, 想簡單點 用String 的intern()方法,從常量池獲取
  @GetMapping("/payV")
    public synchronized String payLockBySynchronized() {
        Goods goods = goodsMapper.selectByPrimaryKey(1);
        int storage = goods.getGoodsStorage();
        goods.setGoodsStorage(storage - 1);
        if (goods.getGoodsStorage() >= 0) {
            goodsMapper.updateByPrimaryKey(goods);
            System.out.println("本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件");
            return "本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件";
        } else {
            System.out.println("庫存不足");
            return "庫存不足";
        }
    }

  case six :   ReentrantLock

//  既然synchronized都測了 就再測下ReentrantLock  和synchronized 自己封裝的AQS 效率都差不多
  ReentrantLock reentrantLock = new ReentrantLock();
    @GetMapping("/payVI")
    public String payLockByReentrantLock() {
        reentrantLock.lock();
        Goods goods = goodsMapper.selectByPrimaryKey(1);
        int storage = goods.getGoodsStorage();
        goods.setGoodsStorage(storage - 1);
        if (goods.getGoodsStorage() >= 0) {
            goodsMapper.updateByPrimaryKey(goods);
            System.out.println("本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件");
            myLock.lock.unlock();
            return "本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件";
        } else {
            reentrantLock.unlock();
            System.out.println("庫存不足");
            return "庫存不足";
        }
    }

 

  


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM