Preface:
所有场景都不是生产环境,作为Demo仅供参考,测试工具为Jmeter.用nginx做分发
upstream lock{ #//不同于网上一般的解释,我测试的结果为: 未建立连接次数达到max_fails次后,该服务器会被屏蔽fail_timeout时间,这期间该服务器将不再接收请求 server 127.0.0.1:8082 weight=1 max_fails=3 fail_timeout=10s; server 127.0.0.1:8081 weight=1 max_fails=3 fail_timeout=10s; } server { listen 80; server_name localhost; location / { proxy_pass http://lock; proxy_connect_timeout 1; //这里是nginx与服务器的连接等候时间,超过该时间未建立连接则放弃,转发请求到其他服务器 } }
Test:
case one: 数据库version字段 乐观锁
//单体应用 分布式都可以用 问题就是所有请求都怼到数据库了 数据库压力大 @GetMapping("/pay") public String testPayByCAS(){ for (int i = 0; i < 5; i++) { Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); int version = goods.getVersion(); goods.setGoodsStorage(storage-1); goods.setVersion(version+1); if(goods.getGoodsStorage()>=0){ GoodsExample goodsExample = new GoodsExample(); goodsExample.createCriteria().andVersionEqualTo(version).andIdEqualTo(goods.getId()); int res = goodsMapper.updateByExample(goods,goodsExample); if(res==0){ continue; }else{ log.info("本次购买商品一件,剩余库存{}件",goods.getGoodsStorage()); return "本次购买商品一件,剩余库存"+goods.getGoodsStorage()+"件"; } }else{ log.info("库存不足"); return "库存不足"; } } return "网络延迟,请稍后尝试"; }
case:two redis setnx ex
//利用redis 做分布式锁,此处实现需要根据实际场景进行优化. 好处是能分担数据库压力,但加锁的时间无法确定,需要另启线程进行延时处理 //有个问题是 如果子线程延期6次后 主线程还未运行完毕 后续又会引发很多问题,这里可以 结合case1的乐观锁一起使用,用version字段双重保险,或许本来就应该这么做 static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20)); //这里最高并发限制50,我这里只有拿到锁了 才开启线程,早已有余,商品种类多,可以考虑加大队列/最大线程数 static { pool.prestartAllCoreThreads(); //预先创建核心线程 } @GetMapping("/payII") public String testPayLockByRedis() throws InterruptedException { //这里最好还是用userId String name = Thread.currentThread().getName(); for (int i = 0; i < 5; i++) { Boolean lock = redisTemplate.opsForValue().setIfAbsent("1", name, 10000, TimeUnit.MILLISECONDS); if (lock) { MyRunnable m = new MyRunnable(name); try { pool.execute(m); Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); goods.setGoodsStorage(storage - 1); if (goods.getGoodsStorage() >= 0) { goodsMapper.updateByPrimaryKey(goods); log.info("本次购买商品一件,剩余库存{}件", goods.getGoodsStorage()); return "本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件"; } else { log.info("库存不足"); return "库存不足"; } } catch (Exception e) { //这里抛出异常让事务回滚 , 异常部分让切面处理,优先级和事务一致,优先级一致事务先执行 throw new RunTimeException(e); } finally { m.flag = false; //这里的处理是因为 子线程在延时6次后,没有中断主线程的运行(这里无法中断,线程之间的运行是独立的,子线程抛出异常无法被主线程捕获,至多让thread设置一个 //UncaughtExceptionHandler,在子线程抛出异常后,子线程内部自己进行捕获处理逻辑,然而还是不能影响主线程),既然如此,存在30s过后主线程仍未执行完毕的可能性,此时锁已易主,如果未 //和version字段一起做保险处理,建议抛出异常,回滚事务 Object o = redisTemplate.opsForValue().get("1") if (o!=null && name.equals(o.toString())) { redisTemplate.delete("1"); }else{throw new RuntimeException("锁已失效")} } } else { continue; } } return "网络延迟,请稍后尝试"; } private class MyRunnable implements Runnable { boolean flag = true; int time; //延期次数 String name; //线程name 用userId好些 private MyRunnable(String name) {this.name = name;} @Override public void run() { try { //给予一定的处理时间 再给任务做延时处理 Thread.sleep(2000); while (flag && time++ < 6) { Object o = redisTemplate.opsForValue().get("1"); //锁存在 且是自身加的锁 给锁延期 if (o != null && o.toString().equals(name) && flag) { //如果出现判定通过 /* case1: 外面修改flag 这里延期成功 外面删除 并不影响 case2: 外面修改flag 删除 还未往redis 重新set 这里就延期 也不影响 case3: 外面修改flag 删除 其他用户往redis 重新set 这里再延期 好像也不会发生什么 */ redisTemplate.expire("1", 10000, TimeUnit.MILLISECONDS); System.out.println("延时一次"); } Thread.sleep(3000); } }catch (InterruptedException e) { e.printStackTrace(); } } }
case 3: redisson
//redisson 已经把上面我们想的到的 以及想不到的 都做了,我们只管用就好了,另外redisson还可以使用multilock,将一组锁当做一个锁来加锁和释放,保证原子性 //如我影院Demo中用的就是: 其实redisson这块认知还很浅,包括redis也是,有时间还是需要钻研钻研 RLock[] locks = new RLock[list.size()]; for (int i = 0; i < list.size(); i++) { RLock lock = redisson.getLock(String.valueOf(list.get(i).getFtpId())); ftps.add(list.get(i).getFtpId()); locks[i] = lock; } RLock multiLock = redisson.getMultiLock(locks); multiLock.lock(); ..... multiLock.unlock(); @GetMapping("/payIII") public String payLockByRedisson() { RLock lockI = redisson.getLock("1"); try { lockI.lock(15, TimeUnit.SECONDS); Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); goods.setGoodsStorage(storage - 1); if (goods.getGoodsStorage() >= 0) { goodsMapper.updateByPrimaryKey(goods); System.out.println("本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件"); return "本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件"; } else { System.out.println("库存不足"); return "库存不足"; } } finally { lockI.unlock(); } }
case four: Aqs
//下面看下 用抽象队列同步器做的锁 ,ReentrantLock的底层,当然我不是看了底层自己敲出来的,暂时还没时间去研究那些. 这是以前网上学的,这里分享下 @GetMapping("/payIv") public String payLockByAqs() { aqsLock.lock(); Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); goods.setGoodsStorage(storage - 1); if (goods.getGoodsStorage() >= 0) { goodsMapper.updateByPrimaryKey(goods); System.out.println("本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件"); aqsLock.unlock(); return "本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件"; } else { System.out.println("库存不足"); aqsLock.unlock(); return "库存不足"; } }
AqsLock
//模拟下流程:a,b,c进入方法,a获取到锁,b,c也尝试获取,cas操作失败,按顺序加入队列,a解锁,唤醒队列第一个线程b,b醒来获取锁,自身为队首,移除,执行操作,解锁唤醒c重复操作 @Component public class AqsLock { private volatile int state = 0; //保证线程之间的通信 private static final Unsafe UNSAFE = UnsafeInstance.getUnsafeInstance(); private ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>(); //这是一个无界队列 public void lock(){ Thread current = Thread.currentThread(); //如果获取锁 就无事发生,没获取到锁将进入阻塞状态 if(acquire(current)){ return; } waiters.add(current); for(;;){ //peek 获取头元素 并尝试获取锁 ,能进行这层判断,肯定在某处被唤醒,类似于 notify if(acquire(current)){ return; } LockSupport.park(current); //LockSupport 内部封装的Unsafe类 } } public void unlock(){ //将标志位归原 其他线程才有获得锁的可能 compareAndSwapState(state,0); Thread thread = waiters.peek(); if(thread!=null){ //这里可以理解为 notify,或者说 condition.signal()后 unlock() LockSupport.unpark(thread); } } private static long stateOffset; static{ try { //获取偏移量 stateOffset = UNSAFE.objectFieldOffset(AqsLock.class.getDeclaredField("state")); } catch (NoSuchFieldException e) { e.printStackTrace(); } } public boolean acquire(Thread thread){ if(state==0){ if(compareAndSwapState(0,1)){ // 无竞争情况下 自身是不会进入队列的 所以多一层判断 poll为获取元素并且从队列中移除,如果队列为空返回null if(thread == waiters.peek()){ waiters.poll(); } return true; } } return false; } public final boolean compareAndSwapState(int expect,int update){ return UNSAFE.compareAndSwapInt(this,stateOffset,expect,update); } }
Unsafe
//构造私有 只能反射获取
public class UnsafeInstance { public static Unsafe getUnsafeInstance() { Field theUnsafe = null; try { theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); return (Unsafe)theUnsafe.get(null); } catch (NoSuchFieldException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } return null; } }
case five: synchronized
//闲来无事 顺手测下synchronized 效率竟然比case one高, 当然 这是错误的示范,synchronized只能加在方法内,那么多件商品,不可能共享一把锁 //synchronized锁对象 一定要保证是一个唯一对象, 想简单点 用String 的intern()方法,从常量池获取 @GetMapping("/payV") public synchronized String payLockBySynchronized() { Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); goods.setGoodsStorage(storage - 1); if (goods.getGoodsStorage() >= 0) { goodsMapper.updateByPrimaryKey(goods); System.out.println("本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件"); return "本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件"; } else { System.out.println("库存不足"); return "库存不足"; } }
case six : ReentrantLock
// 既然synchronized都测了 就再测下ReentrantLock 和synchronized 自己封装的AQS 效率都差不多 ReentrantLock reentrantLock = new ReentrantLock(); @GetMapping("/payVI") public String payLockByReentrantLock() { reentrantLock.lock(); Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); goods.setGoodsStorage(storage - 1); if (goods.getGoodsStorage() >= 0) { goodsMapper.updateByPrimaryKey(goods); System.out.println("本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件"); myLock.lock.unlock(); return "本次购买商品一件,剩余库存" + goods.getGoodsStorage() + "件"; } else { reentrantLock.unlock(); System.out.println("库存不足"); return "库存不足"; } }