redis-分布式鎖-設計與使用


死鎖

錯誤例子

 

 

 

解決方式

 防止死鎖 通過設置超時時間
 不要使用setnx key   expire 20  不能保證原子性 如果setnx程序就掛了 沒有執行expire就死鎖了
 reidis2.8版本提供 set lock:key1 true ex 5 nx 方式 保證了  setnx+expire原子性方式執行(秒為單位)

鎖超時

錯誤例子

        String lockKey="stock:product:1";
        boolean  isGetLock=false;
        try{
            //假設是原子性的 獲取鎖並設置鎖10秒
            isGetLock==setnx(lockKey,10);
            if(!isGetLock){
                throw  new Exception("系統繁忙!請稍后再試");
            }
//模擬需要執行12秒 Thread.sleep(
12); }finally { if(isGetLock){ del(lockKey); } }

假設有線程A線程B 2個線程

線程A率先拿到鎖因為我們設置的鎖10秒自動釋放(redis過期時間10秒) 而我們程序需要執行10秒以上

10.1ms秒的時候線程B進來 因為redis鎖key已經過期成功拿到鎖 並阻塞在12秒處

12秒后線程A 執行完 執行del操作 導致釋放了線程B的鎖

解決方式1

 String lockKey="stock:product:1";
        boolean  isGetLock=false;
        //用來標識當前身份
        String currentIndex=UUID.randomUUID().toString();
        try{
            //假設是原子性的 獲取鎖並設置鎖10秒 同時設置一個值為currentIndex
            isGetLock==setnx(lockKey,currentIndex,10);
            if(!isGetLock){
                throw  new Exception("系統繁忙!請稍后再試");
            }
            //模擬需要執行12秒
            Thread.sleep(12);
        }finally {
            if(isGetLock){
                String lockValue=get(lockKey);
                //表示是當前線程的鎖 釋放
                if(lockValue!=null&&lockValue.equals(currentIndex)) {
                    del(lockKey);
                }
            }
        }

方式1優化方案

簡單一看 好像並沒有什么問題 但是需要注意 get 比較 和del並不是原子性的

比如 線程A get完之后 lockkey因為超時釋放   線程B 成功獲得鎖    線程A再執行if判斷 會刪除調線程B的鎖

改為lua腳本 

if redis.call("get",KEYS[1]==ARGV[1])  then
return redis.call("del","KEYS1")
else
return 0
end

主從切換

線程A從主節點加鎖成功  這個時候主節點掛掉,從節點替換主節點 鎖數據並沒有同步過來 導致2個線程會獲得鎖  只會在 掛掉時 從節點還未同步時導致這樣的情況 極少情況發生 不過一般業務場景都能接受
 

可重入鎖實現

/**
 * @Auther: liqiang
 * @Date: 2019/7/14 14:59
 * @Description:
 */
public class RedisWithReentrantLock {
    private ThreadLocal<Map<String,Integer>> lockers=new ThreadLocal<>();
    private Jedis jedis;
    public  RedisWithReentrantLock(Jedis jedis){
        this.jedis=jedis;
    }
    /**
     * 加鎖
     */
    private boolean _lock(String key){
        String value=String.valueOf(System.currentTimeMillis());;
        return jedis.set(key,value,"nx","ex",5L)!=null;
    }
    /**
     * 釋放鎖
     * @param key
     */
    private void  _unlock(String key){
        jedis.del(key);
    }

    /**
     * 從線程緩存獲取map 沒有就初始化一個
     * @return
     */
    private  Map<String,Integer> currentLockers(){
        Map<String,Integer> refs=lockers.get();
        if(refs==null){
            refs=new HashMap<String,Integer>();
            lockers.set(refs);
        }
        return lockers.get();
    }

    /**
     * 可重入鎖
     * @param key
     * @return
     */
    public boolean lock(String key){
        /**
         * 選擇map的原因是 一個線程里面可能有很多加鎖的地方
         */
        Map<String,Integer> lockers=currentLockers();
        /**
         *如果存在 表示是重入加鎖
         */
        if(lockers.containsKey(key)){
            lockers.put(key,lockers.get(key)+1);
            //延長過期時間
            jedis.expire(key,5000);
            return true;
        }
        //走到這里表示是頭部第一次加鎖 加鎖並對應map數量+1
        boolean isGetLock=_lock(key);
        lockers.put(key,1);
        return  isGetLock;
    }

    /**
     * 釋放鎖
     * @param key
     * @return
     */
    public boolean unLock(String key){
        /**
         * 獲得map
         */
        Map<String,Integer> lockers=currentLockers();
        /**
         * 表示key未加過鎖 或者釋放了
         */
        Integer refCnt=lockers.get(key);
        if(refCnt==null){
            return false;
        }
        //-1
        refCnt-=1;
        //大於0表示不是頭部鎖釋放
        if(refCnt>0){
            lockers.put(key,refCnt);
        }else{
            //小於等於0 表示是頭部鎖釋放 刪除mapkey
            lockers.remove(key);
            /**
             * 釋放鎖
             */
            _unlock(key);
        }
        return true;
    }
    public static void main(String[] args) {
        Jedis conn = new Jedis("127.0.0.1",6379);
        conn.select(1);
        RedisWithReentrantLock redisWithReentrantLock=new RedisWithReentrantLock(conn);
        String lockKey="lock:key3";
        redisWithReentrantLock.lock(lockKey);
        redisWithReentrantLock.lock(lockKey);

        redisWithReentrantLock.unLock(lockKey);
        redisWithReentrantLock.unLock(lockKey);
    }
}

一些建議

建議涉及並發的地方能用原子性操作就用原子性

例子一

       tock stock=stockDao.get(id);
        if(stock.getNumber()-10<0){
            throw new Exception("庫存不足");
        }
        stock.setNumber(stock.getNumber-10);
        stockDao.update(stock);

這種情況就算加鎖的情況 如果出現上面說的幾種極端情況 或者鎖失效了 會導致超賣以及庫存異常問題

優化方案

         Stock stock=stockDao.get(id);
        /**
         * 這里可能會疑惑 下面有原子性的update加 where校驗超賣 這一步是否不需要了
         * 個人理解 程序進行校驗 總比全部堆到數據庫校驗好的多
         * 比如庫存賣完了 還持續有並發請求 在這里就可以全部擋在外面
         */
        if(stock.getNumber()-10<0){
            throw new Exception("庫存不足");
        }
        stock.setNumber(stock.getNumber-10);
        //原子性的update
        Integer updateNumber=stockDao.excuteSql("update stock set number-=10 where id=:id and number>=0",id);
        //表示未能成功修改 
        if(updateNumber<=0){
            throw new Exception("庫存不足");
        }

 redis則使用對應redis遞增遞減

對於提供給管理員的庫存盤點 也是使用原子性遞增遞減 

盤增

比如當前庫存是10  管理員調整20  則是+10  而不要直接set 20   不然並發時 10 賣了5  這個時候20才提交 則變成了20  如果+10 則變成15

盤減

比如當前庫存是10 管理員 需要調整為5    並發時減成了0  執行update stock set number-=5 where id=:id and number>=0   number>=0並不成立所以修改失敗

高並發時建議(比如秒殺場景)

將庫存全量到redis  通過Incrby 命令實現原子性遞增遞減   如果消息發送失敗需要進行補償

update stock set number-=10 where id=:id and number>=0 通過mq 隊列異步執行 否則會出現同一個庫存並發改 部分是失敗數據庫拋出waitLock  tps就上不去 還會有大量請求到數據庫 可能把redis

弄掛

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM