实现防止超卖的几种方式


Preface:

  所有场景都不是生产环境,作为Demo仅供参考,测试工具为Jmeter.用nginx做分发

upstream lock{
     #//不同于网上一般的解释,我测试的结果为: 未建立连接次数达到max_fails次后,该服务器会被屏蔽fail_timeout时间,这期间该服务器将不再接收请求
            server 127.0.0.1:8082 weight=1 max_fails=3  fail_timeout=10s;  
            server 127.0.0.1:8081 weight=1 max_fails=3  fail_timeout=10s;
    }
    server {
        listen       80;
        server_name  localhost;
        location / {
            proxy_pass http://lock;
            proxy_connect_timeout 1;    //这里是nginx与服务器的连接等候时间,超过该时间未建立连接则放弃,转发请求到其他服务器
        }
    }

Test:

  case one:   数据库version字段 乐观锁

//单体应用 分布式都可以用 问题就是所有请求都怼到数据库了 数据库压力大
@GetMapping("/pay")
public String testPayByCAS(){
    for (int i = 0; i < 5; i++) {
        Goods goods = goodsMapper.selectByPrimaryKey(1);
        int storage = goods.getGoodsStorage();
        int version = goods.getVersion();
        goods.setGoodsStorage(storage-1);
        goods.setVersion(version+1);
        if(goods.getGoodsStorage()>=0){
            GoodsExample goodsExample = new GoodsExample();
            goodsExample.createCriteria().andVersionEqualTo(version).andIdEqualTo(goods.getId());
            int res = goodsMapper.updateByExample(goods,goodsExample);
            if(res==0){
                continue;
            }else{
                log.info("本次购买商品一件,剩余库存{}件",goods.getGoodsStorage());
                return "本次购买商品一件,剩余库存"+goods.getGoodsStorage()+"件";
            }
        }else{
            log.info("库存不足");
            return "库存不足";
        }
    }
    return "网络延迟,请稍后尝试";
}

  case:two  redis setnx ex  

//利用redis 做分布式锁,此处实现需要根据实际场景进行优化. 好处是能分担数据库压力,但加锁的时间无法确定,需要另启线程进行延时处理
//有个问题是 如果子线程延期6次后 主线程还未运行完毕 后续又会引发很多问题,这里可以 结合case1的乐观锁一起使用,用version字段双重保险,或许本来就应该这么做
static ThreadPoolExecutor pool = new ThreadPoolExecutor(10,
            20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20));
    //这里最高并发限制50,我这里只有拿到锁了 才开启线程,早已有余,商品种类多,可以考虑加大队列/最大线程数
    static {
        pool.prestartAllCoreThreads(); //预先创建核心线程
    }
    @GetMapping("/payII")
    public String testPayLockByRedis() throws InterruptedException {
        //这里最好还是用userId
        String name = Thread.currentThread().getName();
        for (int i = 0; i < 5; i++) {
            Boolean lock = redisTemplate.opsForValue().setIfAbsent("1", name, 10000, TimeUnit.MILLISECONDS);
            if (lock) {
                MyRunnable m = new MyRunnable(name);
                try {
                    pool.execute(m);
                    Goods goods = goodsMapper.selectByPrimaryKey(1);
                    int storage = goods.getGoodsStorage();
                    goods.setGoodsStorage(storage - 1);
                    if (goods.getGoodsStorage() >= 0) {
                        goodsMapper.updateByPrimaryKey(goods);
                        log.info("本次购买商品一件,剩余库存{}件", goods.getGoodsStorage());
                        return "本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件";
                    } else {
                        log.info("库存不足");
                        return "库存不足";
                    }
                } catch (Exception e) {
            //这里抛出异常让事务回滚 , 异常部分让切面处理,优先级和事务一致,优先级一致事务先执行
                    throw new RunTimeException(e);
                } finally {
                    m.flag = false;
            //这里的处理是因为 子线程在延时6次后,没有中断主线程的运行(这里无法中断,线程之间的运行是独立的,子线程抛出异常无法被主线程捕获,至多让thread设置一个
//UncaughtExceptionHandler,在子线程抛出异常后,子线程内部自己进行捕获处理逻辑,然而还是不能影响主线程),既然如此,存在30s过后主线程仍未执行完毕的可能性,此时锁已易主,如果未
//和version字段一起做保险处理,建议抛出异常,回滚事务
                     Object o = redisTemplate.opsForValue().get("1")
              if (o!=null && name.equals(o.toString())) {
                 redisTemplate.delete("1");
                    }else{throw new RuntimeException("锁已失效")}
                }
            } else {
                continue;
            }
        }
        return "网络延迟,请稍后尝试";
    }

    private class MyRunnable implements Runnable {
        boolean flag = true;
        int time;  //延期次数
        String name;  //线程name   用userId好些
        private MyRunnable(String name) {this.name = name;}

        @Override
        public void run() {
            try {
                //给予一定的处理时间 再给任务做延时处理
                Thread.sleep(2000);
                while (flag && time++ < 6) {
                    Object o = redisTemplate.opsForValue().get("1");
                    //锁存在 且是自身加的锁  给锁延期
                    if (o != null && o.toString().equals(name) && flag) {
                        //如果出现判定通过
                           /* case1:  外面修改flag  这里延期成功  外面删除  并不影响
                              case2:  外面修改flag  删除  还未往redis 重新set   这里就延期  也不影响
                              case3:  外面修改flag  删除  其他用户往redis 重新set   这里再延期 好像也不会发生什么
                            */
                        redisTemplate.expire("1", 10000, TimeUnit.MILLISECONDS);
                        System.out.println("延时一次");
                    }
                    Thread.sleep(3000);
                }
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

  case 3: redisson

//redisson 已经把上面我们想的到的 以及想不到的 都做了,我们只管用就好了,另外redisson还可以使用multilock,将一组锁当做一个锁来加锁和释放,保证原子性
//如我影院Demo中用的就是:  其实redisson这块认知还很浅,包括redis也是,有时间还是需要钻研钻研
    RLock[] locks = new RLock[list.size()];
    for (int i = 0; i < list.size(); i++) {
      RLock lock = redisson.getLock(String.valueOf(list.get(i).getFtpId()));
      ftps.add(list.get(i).getFtpId());
      locks[i] = lock;
  }
  RLock multiLock = redisson.getMultiLock(locks);
    multiLock.lock();
  .....
  multiLock.unlock();

  @GetMapping("/payIII")
    public String payLockByRedisson() {
        RLock lockI = redisson.getLock("1");
        try {
            lockI.lock(15, TimeUnit.SECONDS);
            Goods goods = goodsMapper.selectByPrimaryKey(1);
            int storage = goods.getGoodsStorage();
            goods.setGoodsStorage(storage - 1);
            if (goods.getGoodsStorage() >= 0) {
                goodsMapper.updateByPrimaryKey(goods);
                System.out.println("本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件");
                return "本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件";
            } else {
                System.out.println("库存不足");
                return "库存不足";
            }
        } finally {
            lockI.unlock();
        }
    }            

  case four:  Aqs   

//下面看下 用抽象队列同步器做的锁 ,ReentrantLock的底层,当然我不是看了底层自己敲出来的,暂时还没时间去研究那些. 这是以前网上学的,这里分享下
  @GetMapping("/payIv")
    public String payLockByAqs() {
        aqsLock.lock();
        Goods goods = goodsMapper.selectByPrimaryKey(1);
        int storage = goods.getGoodsStorage();
        goods.setGoodsStorage(storage - 1);
        if (goods.getGoodsStorage() >= 0) {
            goodsMapper.updateByPrimaryKey(goods);
            System.out.println("本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件");
            aqsLock.unlock();
            return "本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件";
        } else {
            System.out.println("库存不足");
            aqsLock.unlock();
            return "库存不足";
        }
    }

  AqsLock   

//模拟下流程:a,b,c进入方法,a获取到锁,b,c也尝试获取,cas操作失败,按顺序加入队列,a解锁,唤醒队列第一个线程b,b醒来获取锁,自身为队首,移除,执行操作,解锁唤醒c重复操作
@Component
public class AqsLock {
    private volatile int state = 0;   //保证线程之间的通信
    private static final Unsafe UNSAFE = UnsafeInstance.getUnsafeInstance();
    private ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>();  //这是一个无界队列

    public void lock(){
        Thread current = Thread.currentThread();
        //如果获取锁 就无事发生,没获取到锁将进入阻塞状态
        if(acquire(current)){
            return;
        }
        waiters.add(current);
        for(;;){
            //peek 获取头元素 并尝试获取锁 ,能进行这层判断,肯定在某处被唤醒,类似于 notify
            if(acquire(current)){
                return;
            }
            LockSupport.park(current);  //LockSupport 内部封装的Unsafe类
        }
    }

    public void unlock(){
        //将标志位归原 其他线程才有获得锁的可能
        compareAndSwapState(state,0);
        Thread thread = waiters.peek();
        if(thread!=null){
            //这里可以理解为 notify,或者说 condition.signal()后 unlock()
            LockSupport.unpark(thread);
        }
    }
    private static long stateOffset;
    static{
        try {
            //获取偏移量
            stateOffset = UNSAFE.objectFieldOffset(AqsLock.class.getDeclaredField("state"));
        } catch (NoSuchFieldException e) {
            e.printStackTrace();
        }
    }
    public boolean acquire(Thread thread){
        if(state==0){
            if(compareAndSwapState(0,1)){
                // 无竞争情况下 自身是不会进入队列的 所以多一层判断    poll为获取元素并且从队列中移除,如果队列为空返回null
                if(thread == waiters.peek()){
                    waiters.poll();
                }
                return true;
            }
        }
        return false;
    }
    public final boolean compareAndSwapState(int expect,int update){
        return UNSAFE.compareAndSwapInt(this,stateOffset,expect,update);
    }
}

  Unsafe

  //构造私有 只能反射获取
public
class UnsafeInstance { public static Unsafe getUnsafeInstance() { Field theUnsafe = null; try { theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); return (Unsafe)theUnsafe.get(null); } catch (NoSuchFieldException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } return null; } }

  case five:  synchronized

  //闲来无事 顺手测下synchronized 效率竟然比case one高, 当然 这是错误的示范,synchronized只能加在方法内,那么多件商品,不可能共享一把锁
  //synchronized锁对象 一定要保证是一个唯一对象, 想简单点 用String 的intern()方法,从常量池获取
  @GetMapping("/payV")
    public synchronized String payLockBySynchronized() {
        Goods goods = goodsMapper.selectByPrimaryKey(1);
        int storage = goods.getGoodsStorage();
        goods.setGoodsStorage(storage - 1);
        if (goods.getGoodsStorage() >= 0) {
            goodsMapper.updateByPrimaryKey(goods);
            System.out.println("本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件");
            return "本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件";
        } else {
            System.out.println("库存不足");
            return "库存不足";
        }
    }

  case six :   ReentrantLock

//  既然synchronized都测了 就再测下ReentrantLock  和synchronized 自己封装的AQS 效率都差不多
  ReentrantLock reentrantLock = new ReentrantLock();
    @GetMapping("/payVI")
    public String payLockByReentrantLock() {
        reentrantLock.lock();
        Goods goods = goodsMapper.selectByPrimaryKey(1);
        int storage = goods.getGoodsStorage();
        goods.setGoodsStorage(storage - 1);
        if (goods.getGoodsStorage() >= 0) {
            goodsMapper.updateByPrimaryKey(goods);
            System.out.println("本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件");
            myLock.lock.unlock();
            return "本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件";
        } else {
            reentrantLock.unlock();
            System.out.println("库存不足");
            return "库存不足";
        }
    }

 

  


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM