Preface:
所有場景都不是生產環境,作為Demo僅供參考,測試工具為Jmeter.用nginx做分發
upstream lock{ #//不同於網上一般的解釋,我測試的結果為: 未建立連接次數達到max_fails次后,該服務器會被屏蔽fail_timeout時間,這期間該服務器將不再接收請求 server 127.0.0.1:8082 weight=1 max_fails=3 fail_timeout=10s; server 127.0.0.1:8081 weight=1 max_fails=3 fail_timeout=10s; } server { listen 80; server_name localhost; location / { proxy_pass http://lock; proxy_connect_timeout 1; //這里是nginx與服務器的連接等候時間,超過該時間未建立連接則放棄,轉發請求到其他服務器 } }
Test:
case one: 數據庫version字段 樂觀鎖
//單體應用 分布式都可以用 問題就是所有請求都懟到數據庫了 數據庫壓力大 @GetMapping("/pay") public String testPayByCAS(){ for (int i = 0; i < 5; i++) { Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); int version = goods.getVersion(); goods.setGoodsStorage(storage-1); goods.setVersion(version+1); if(goods.getGoodsStorage()>=0){ GoodsExample goodsExample = new GoodsExample(); goodsExample.createCriteria().andVersionEqualTo(version).andIdEqualTo(goods.getId()); int res = goodsMapper.updateByExample(goods,goodsExample); if(res==0){ continue; }else{ log.info("本次購買商品一件,剩余庫存{}件",goods.getGoodsStorage()); return "本次購買商品一件,剩余庫存"+goods.getGoodsStorage()+"件"; } }else{ log.info("庫存不足"); return "庫存不足"; } } return "網絡延遲,請稍后嘗試"; }
case:two redis setnx ex
//利用redis 做分布式鎖,此處實現需要根據實際場景進行優化. 好處是能分擔數據庫壓力,但加鎖的時間無法確定,需要另啟線程進行延時處理 //有個問題是 如果子線程延期6次后 主線程還未運行完畢 后續又會引發很多問題,這里可以 結合case1的樂觀鎖一起使用,用version字段雙重保險,或許本來就應該這么做 static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new ArrayBlockingQueue<>(20)); //這里最高並發限制50,我這里只有拿到鎖了 才開啟線程,早已有余,商品種類多,可以考慮加大隊列/最大線程數 static { pool.prestartAllCoreThreads(); //預先創建核心線程 } @GetMapping("/payII") public String testPayLockByRedis() throws InterruptedException { //這里最好還是用userId String name = Thread.currentThread().getName(); for (int i = 0; i < 5; i++) { Boolean lock = redisTemplate.opsForValue().setIfAbsent("1", name, 10000, TimeUnit.MILLISECONDS); if (lock) { MyRunnable m = new MyRunnable(name); try { pool.execute(m); Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); goods.setGoodsStorage(storage - 1); if (goods.getGoodsStorage() >= 0) { goodsMapper.updateByPrimaryKey(goods); log.info("本次購買商品一件,剩余庫存{}件", goods.getGoodsStorage()); return "本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件"; } else { log.info("庫存不足"); return "庫存不足"; } } catch (Exception e) { //這里拋出異常讓事務回滾 , 異常部分讓切面處理,優先級和事務一致,優先級一致事務先執行 throw new RunTimeException(e); } finally { m.flag = false; //這里的處理是因為 子線程在延時6次后,沒有中斷主線程的運行(這里無法中斷,線程之間的運行是獨立的,子線程拋出異常無法被主線程捕獲,至多讓thread設置一個 //UncaughtExceptionHandler,在子線程拋出異常后,子線程內部自己進行捕獲處理邏輯,然而還是不能影響主線程),既然如此,存在30s過后主線程仍未執行完畢的可能性,此時鎖已易主,如果未 //和version字段一起做保險處理,建議拋出異常,回滾事務 Object o = redisTemplate.opsForValue().get("1") if (o!=null && name.equals(o.toString())) { redisTemplate.delete("1"); }else{throw new RuntimeException("鎖已失效")} } } else { continue; } } return "網絡延遲,請稍后嘗試"; } private class MyRunnable implements Runnable { boolean flag = true; int time; //延期次數 String name; //線程name 用userId好些 private MyRunnable(String name) {this.name = name;} @Override public void run() { try { //給予一定的處理時間 再給任務做延時處理 Thread.sleep(2000); while (flag && time++ < 6) { Object o = redisTemplate.opsForValue().get("1"); //鎖存在 且是自身加的鎖 給鎖延期 if (o != null && o.toString().equals(name) && flag) { //如果出現判定通過 /* case1: 外面修改flag 這里延期成功 外面刪除 並不影響 case2: 外面修改flag 刪除 還未往redis 重新set 這里就延期 也不影響 case3: 外面修改flag 刪除 其他用戶往redis 重新set 這里再延期 好像也不會發生什么 */ redisTemplate.expire("1", 10000, TimeUnit.MILLISECONDS); System.out.println("延時一次"); } Thread.sleep(3000); } }catch (InterruptedException e) { e.printStackTrace(); } } }
case 3: redisson
//redisson 已經把上面我們想的到的 以及想不到的 都做了,我們只管用就好了,另外redisson還可以使用multilock,將一組鎖當做一個鎖來加鎖和釋放,保證原子性 //如我影院Demo中用的就是: 其實redisson這塊認知還很淺,包括redis也是,有時間還是需要鑽研鑽研 RLock[] locks = new RLock[list.size()]; for (int i = 0; i < list.size(); i++) { RLock lock = redisson.getLock(String.valueOf(list.get(i).getFtpId())); ftps.add(list.get(i).getFtpId()); locks[i] = lock; } RLock multiLock = redisson.getMultiLock(locks); multiLock.lock(); ..... multiLock.unlock(); @GetMapping("/payIII") public String payLockByRedisson() { RLock lockI = redisson.getLock("1"); try { lockI.lock(15, TimeUnit.SECONDS); Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); goods.setGoodsStorage(storage - 1); if (goods.getGoodsStorage() >= 0) { goodsMapper.updateByPrimaryKey(goods); System.out.println("本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件"); return "本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件"; } else { System.out.println("庫存不足"); return "庫存不足"; } } finally { lockI.unlock(); } }
case four: Aqs
//下面看下 用抽象隊列同步器做的鎖 ,ReentrantLock的底層,當然我不是看了底層自己敲出來的,暫時還沒時間去研究那些. 這是以前網上學的,這里分享下 @GetMapping("/payIv") public String payLockByAqs() { aqsLock.lock(); Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); goods.setGoodsStorage(storage - 1); if (goods.getGoodsStorage() >= 0) { goodsMapper.updateByPrimaryKey(goods); System.out.println("本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件"); aqsLock.unlock(); return "本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件"; } else { System.out.println("庫存不足"); aqsLock.unlock(); return "庫存不足"; } }
AqsLock
//模擬下流程:a,b,c進入方法,a獲取到鎖,b,c也嘗試獲取,cas操作失敗,按順序加入隊列,a解鎖,喚醒隊列第一個線程b,b醒來獲取鎖,自身為隊首,移除,執行操作,解鎖喚醒c重復操作 @Component public class AqsLock { private volatile int state = 0; //保證線程之間的通信 private static final Unsafe UNSAFE = UnsafeInstance.getUnsafeInstance(); private ConcurrentLinkedQueue<Thread> waiters = new ConcurrentLinkedQueue<>(); //這是一個無界隊列 public void lock(){ Thread current = Thread.currentThread(); //如果獲取鎖 就無事發生,沒獲取到鎖將進入阻塞狀態 if(acquire(current)){ return; } waiters.add(current); for(;;){ //peek 獲取頭元素 並嘗試獲取鎖 ,能進行這層判斷,肯定在某處被喚醒,類似於 notify if(acquire(current)){ return; } LockSupport.park(current); //LockSupport 內部封裝的Unsafe類 } } public void unlock(){ //將標志位歸原 其他線程才有獲得鎖的可能 compareAndSwapState(state,0); Thread thread = waiters.peek(); if(thread!=null){ //這里可以理解為 notify,或者說 condition.signal()后 unlock() LockSupport.unpark(thread); } } private static long stateOffset; static{ try { //獲取偏移量 stateOffset = UNSAFE.objectFieldOffset(AqsLock.class.getDeclaredField("state")); } catch (NoSuchFieldException e) { e.printStackTrace(); } } public boolean acquire(Thread thread){ if(state==0){ if(compareAndSwapState(0,1)){ // 無競爭情況下 自身是不會進入隊列的 所以多一層判斷 poll為獲取元素並且從隊列中移除,如果隊列為空返回null if(thread == waiters.peek()){ waiters.poll(); } return true; } } return false; } public final boolean compareAndSwapState(int expect,int update){ return UNSAFE.compareAndSwapInt(this,stateOffset,expect,update); } }
Unsafe
//構造私有 只能反射獲取
public class UnsafeInstance { public static Unsafe getUnsafeInstance() { Field theUnsafe = null; try { theUnsafe = Unsafe.class.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); return (Unsafe)theUnsafe.get(null); } catch (NoSuchFieldException e) { e.printStackTrace(); } catch (IllegalAccessException e) { e.printStackTrace(); } return null; } }
case five: synchronized
//閑來無事 順手測下synchronized 效率竟然比case one高, 當然 這是錯誤的示范,synchronized只能加在方法內,那么多件商品,不可能共享一把鎖 //synchronized鎖對象 一定要保證是一個唯一對象, 想簡單點 用String 的intern()方法,從常量池獲取 @GetMapping("/payV") public synchronized String payLockBySynchronized() { Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); goods.setGoodsStorage(storage - 1); if (goods.getGoodsStorage() >= 0) { goodsMapper.updateByPrimaryKey(goods); System.out.println("本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件"); return "本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件"; } else { System.out.println("庫存不足"); return "庫存不足"; } }
case six : ReentrantLock
// 既然synchronized都測了 就再測下ReentrantLock 和synchronized 自己封裝的AQS 效率都差不多 ReentrantLock reentrantLock = new ReentrantLock(); @GetMapping("/payVI") public String payLockByReentrantLock() { reentrantLock.lock(); Goods goods = goodsMapper.selectByPrimaryKey(1); int storage = goods.getGoodsStorage(); goods.setGoodsStorage(storage - 1); if (goods.getGoodsStorage() >= 0) { goodsMapper.updateByPrimaryKey(goods); System.out.println("本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件"); myLock.lock.unlock(); return "本次購買商品一件,剩余庫存" + goods.getGoodsStorage() + "件"; } else { reentrantLock.unlock(); System.out.println("庫存不足"); return "庫存不足"; } }