死鎖
錯誤例子
解決方式
鎖超時
錯誤例子
String lockKey="stock:product:1"; boolean isGetLock=false; try{ //假設是原子性的 獲取鎖並設置鎖10秒 isGetLock==setnx(lockKey,10); if(!isGetLock){ throw new Exception("系統繁忙!請稍后再試"); }
//模擬需要執行12秒 Thread.sleep(12); }finally { if(isGetLock){ del(lockKey); } }
假設有線程A線程B 2個線程
線程A率先拿到鎖因為我們設置的鎖10秒自動釋放(redis過期時間10秒) 而我們程序需要執行10秒以上
10.1ms秒的時候線程B進來 因為redis鎖key已經過期成功拿到鎖 並阻塞在12秒處
12秒后線程A 執行完 執行del操作 導致釋放了線程B的鎖
解決方式1
String lockKey="stock:product:1"; boolean isGetLock=false; //用來標識當前身份 String currentIndex=UUID.randomUUID().toString(); try{ //假設是原子性的 獲取鎖並設置鎖10秒 同時設置一個值為currentIndex isGetLock==setnx(lockKey,currentIndex,10); if(!isGetLock){ throw new Exception("系統繁忙!請稍后再試"); } //模擬需要執行12秒 Thread.sleep(12); }finally { if(isGetLock){ String lockValue=get(lockKey); //表示是當前線程的鎖 釋放 if(lockValue!=null&&lockValue.equals(currentIndex)) { del(lockKey); } } }
方式1優化方案
簡單一看 好像並沒有什么問題 但是需要注意 get 比較 和del並不是原子性的
比如 線程A get完之后 lockkey因為超時釋放 線程B 成功獲得鎖 線程A再執行if判斷 會刪除調線程B的鎖
改為lua腳本
if redis.call("get",KEYS[1]==ARGV[1]) then return redis.call("del","KEYS1") else return 0 end
主從切換
可重入鎖實現
/** * @Auther: liqiang * @Date: 2019/7/14 14:59 * @Description: */ public class RedisWithReentrantLock { private ThreadLocal<Map<String,Integer>> lockers=new ThreadLocal<>(); private Jedis jedis; public RedisWithReentrantLock(Jedis jedis){ this.jedis=jedis; } /** * 加鎖 */ private boolean _lock(String key){ String value=String.valueOf(System.currentTimeMillis());; return jedis.set(key,value,"nx","ex",5L)!=null; } /** * 釋放鎖 * @param key */ private void _unlock(String key){ jedis.del(key); } /** * 從線程緩存獲取map 沒有就初始化一個 * @return */ private Map<String,Integer> currentLockers(){ Map<String,Integer> refs=lockers.get(); if(refs==null){ refs=new HashMap<String,Integer>(); lockers.set(refs); } return lockers.get(); } /** * 可重入鎖 * @param key * @return */ public boolean lock(String key){ /** * 選擇map的原因是 一個線程里面可能有很多加鎖的地方 */ Map<String,Integer> lockers=currentLockers(); /** *如果存在 表示是重入加鎖 */ if(lockers.containsKey(key)){ lockers.put(key,lockers.get(key)+1); //延長過期時間 jedis.expire(key,5000); return true; } //走到這里表示是頭部第一次加鎖 加鎖並對應map數量+1 boolean isGetLock=_lock(key); lockers.put(key,1); return isGetLock; } /** * 釋放鎖 * @param key * @return */ public boolean unLock(String key){ /** * 獲得map */ Map<String,Integer> lockers=currentLockers(); /** * 表示key未加過鎖 或者釋放了 */ Integer refCnt=lockers.get(key); if(refCnt==null){ return false; } //-1 refCnt-=1; //大於0表示不是頭部鎖釋放 if(refCnt>0){ lockers.put(key,refCnt); }else{ //小於等於0 表示是頭部鎖釋放 刪除mapkey lockers.remove(key); /** * 釋放鎖 */ _unlock(key); } return true; } public static void main(String[] args) { Jedis conn = new Jedis("127.0.0.1",6379); conn.select(1); RedisWithReentrantLock redisWithReentrantLock=new RedisWithReentrantLock(conn); String lockKey="lock:key3"; redisWithReentrantLock.lock(lockKey); redisWithReentrantLock.lock(lockKey); redisWithReentrantLock.unLock(lockKey); redisWithReentrantLock.unLock(lockKey); } }
一些建議
建議涉及並發的地方能用原子性操作就用原子性
例子一
tock stock=stockDao.get(id); if(stock.getNumber()-10<0){ throw new Exception("庫存不足"); } stock.setNumber(stock.getNumber-10); stockDao.update(stock);
這種情況就算加鎖的情況 如果出現上面說的幾種極端情況 或者鎖失效了 會導致超賣以及庫存異常問題
優化方案
Stock stock=stockDao.get(id); /** * 這里可能會疑惑 下面有原子性的update加 where校驗超賣 這一步是否不需要了 * 個人理解 程序進行校驗 總比全部堆到數據庫校驗好的多 * 比如庫存賣完了 還持續有並發請求 在這里就可以全部擋在外面 */ if(stock.getNumber()-10<0){ throw new Exception("庫存不足"); } stock.setNumber(stock.getNumber-10); //原子性的update Integer updateNumber=stockDao.excuteSql("update stock set number-=10 where id=:id and number>=0",id); //表示未能成功修改 if(updateNumber<=0){ throw new Exception("庫存不足"); }
redis則使用對應redis遞增遞減
對於提供給管理員的庫存盤點 也是使用原子性遞增遞減
盤增
比如當前庫存是10 管理員調整20 則是+10 而不要直接set 20 不然並發時 10 賣了5 這個時候20才提交 則變成了20 如果+10 則變成15
盤減
比如當前庫存是10 管理員 需要調整為5 並發時減成了0 執行update stock set number-=5 where id=:id and number>=0 number>=0並不成立所以修改失敗
高並發時建議(比如秒殺場景)
將庫存全量到redis 通過Incrby 命令實現原子性遞增遞減 如果消息發送失敗需要進行補償
update stock set number-=10 where id=:id and number>=0 通過mq 隊列異步執行 否則會出現同一個庫存並發改 部分是失敗數據庫拋出waitLock tps就上不去 還會有大量請求到數據庫 可能把redis
弄掛