Redis並發鎖


Redis並發鎖

1、 單線程redis為什么需要分布式鎖

雖然一個redis是單進程單線程模式,但請求並不是一定按先后順序處理的,多個請求會被redis交叉着執行,(就像單個cpu,在一個時間點只能執行一個命令,為什么多個線程執行的時候需要考慮線程安全的問題,因為程序執行的時候往往是一段代碼,並不具有原子性,所以在執行一個命令后,就可能被其他的線程搶去執行權,那么就會造成線程安全的問題),redis類似,對於check and set這種操作,可能在check之后被其余的線程搶去了執行權,之后在set就會出現問題。

這里面涉及NIO/AIO的知識,redis需要鎖也不是專門為了分布式鎖,多個請求的異步交叉處理才是根本原因,一定程度上你可以理解為出現了對共享資源的"並發"訪問,所以要鎖

2、為什么不能用java中的鎖機制

  • java中的鎖synchronized或者Reentrantlock是針對同一個進程而言
  • 在同一進程中的線程是能通過共享的堆內存來進行通信的
  • 不同的線程可以通過對共享的內存進行標記,實現加鎖,達到線程安全的目的
  • 而進程是相互隔離的,不能直接進行通信
  • 在這種情況下java中的鎖就會失效,因此要利用線程間的通信機制來實現分布式的鎖。

3、為什么redis還可以作為分布式鎖呢

  • 當不同的進程中的線程針對同一份數據進行查找與修改的過程中,需要鎖來確保在一個時間段內只能有一個線程來進行這些操作
  • 那么需要在公共內存(不是某一個進程中的內存)中來進行標記,如Redis、Memcache、數據庫或者文件等,使得不同的進程中的線程能訪問這些地方,從而達到加鎖的目的
  • Redis為單進程單線程模式,采用隊列模式將並發訪問變成串行訪問,且多客戶端對Redis的連接並不存在競爭關系。
  • redis實現的分布式鎖不僅能保證對自身數據的操作實現一致性,還能實現對其他共享數據庫或者文件的一致性,主要通過對相同的redis內存進行加鎖,實現加鎖到釋放鎖中間的代碼塊的原子性。

4、java如何實現redis分布式鎖

4.1 初始代碼:

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相當於 jedis.get("stock")
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相當於 jedis.set(key,value)
            System.out.println("扣減成功,剩余庫存:" + realStock + "");
        } else {
            System.out.println("扣減失敗,庫存不足");
                }

        return "end";
    }
}

但是當多個線程同時要訪問deductStock()函數的時候,會存在超賣的現象
主要因為某一個線程正好在執行jedis.set(key,value)之前,其余線程就可能已經結束了獲取數據的操作。

4.2 改進,利用SETNX加鎖

  • setnx 只能對某一個鍵進行一次賦值
  • setnx name a; setnx name b; 第一條命令會成功,而第二條會失敗

實現如下:

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";
        Boolean result = stringRedisTemplate.opsForValue().selfAbsent(lockKey, "name");
        if (!result) {
            return "error";
        }

        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相當於 jedis.get("stock")
        if (stock > 0) {
            int realStock = stock - 1;
            stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相當於 jedis.set(key,value)
            System.out.println("扣減成功,剩余庫存:" + realStock + "");
        } else {
            System.out.println("扣減失敗,庫存不足");
                }

        stringRedisTemplate.delete(lockKey);  --------------------- 1
        return "end";
    }
}

但是,當程序運行到步驟1之前就出現異常了,那么這個鎖就會永久存在,不會被刪除,造成死鎖

4.3 使用try-catch來避免死鎖

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";

        try {

            Boolean result = stringRedisTemplate.opsForValue().selfAbsent(lockKey, "name");
            if (!result) {
                return "error";
            }

            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相當於 jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相當於 jedis.set(key,value)
                System.out.println("扣減成功,剩余庫存:" + realStock + "");
            } else {
                System.out.println("扣減失敗,庫存不足");
                    }

        } finally {
            //釋放鎖
            stringRedisTemplate.delete(lockKey);  --------------------- 1
        }

        return "end";
    }
}

雖然解決了出現異常的情況,但是程序運行到1之前,掛掉了,還是不能將鎖刪除。

4.4 將鎖加上超時時間,避免程序掛掉鎖依然存在的情況

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";

        try {

            Boolean result = stringRedisTemplate.opsForValue().selfAbsent(lockKey, "name");  -------------- 1

            //鎖最多存活時間10秒
            stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);  ----------- 2

            if (!result) {
                return "error";
            }

            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相當於 jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相當於 jedis.set(key,value)
                System.out.println("扣減成功,剩余庫存:" + realStock + "");
            } else {
                System.out.println("扣減失敗,庫存不足");
                    }

        } finally {
            //釋放鎖
            stringRedisTemplate.delete(lockKey);  --------------------- 3
        }

        return "end";
    }
}

但是,當程序執行完1處之后,在執行2處之前出現了問題,程序掛掉,那么還是沒法執行程序2。

4.5 將多行代碼封裝成原子塊,來解決上面問題

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";

        try {
            //將兩行代碼封裝成一個原子塊代碼
            Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "name", 10, TimeUnit.SECONDS);

            if (!result) {
                return "error";
            }

            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相當於 jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相當於 jedis.set(key,value)
                System.out.println("扣減成功,剩余庫存:" + realStock + "");
            } else {
                System.out.println("扣減失敗,庫存不足");
                    }

        } finally {
            //釋放鎖
            stringRedisTemplate.delete(lockKey);  --------------------- 3
        }

        return "end";
    }
}

但是,當程序A的執行時間超過鎖失效的時間,那么當鎖失效的時候,新的程序B就能開始執行,並且當程序A執行完3處代碼時候,將鎖刪除后,新的程序C又能進來,開始執行,如此,完全混亂,鎖失效

4.6 只能讓設置鎖的線程來刪除鎖

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";
        String clientId = UUID.randomUUID().toString();

        try {
            //將兩行代碼封裝成一個原子塊代碼
            Boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);

            if (!result) {
                return "error";
            }

            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相當於 jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相當於 jedis.set(key,value)
                System.out.println("扣減成功,剩余庫存:" + realStock + "");
            } else {
                System.out.println("扣減失敗,庫存不足");
                    }

        } finally {

            //判斷是否是設置鎖的線程
            if (clientId.equals(stringRedisTemplate.opsForValue().get(lockKey))) {  
                //釋放鎖
                stringRedisTemplate.delete(lockKey);  --------------------- 3
            }
        }

        return "end";
    }
}

但是,鎖失效的時間還是不合理,當鎖失效時間小於程序運行這個代碼塊的時間,還是能有其余的線程能進來這個程序

4.7 設置守護線程,延長鎖的失效時間

  • 比如,設置一個timer線程,每隔10s,然后自動設置鎖的失效時間為原來的時間
  • 當主線程執行完畢,或者掛掉,守護線程也會掛掉
  • 使用redisson框架來實現以上優化

如下:

public class IndexController {

    @Autowired
    private Redisson redisson;
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @RequestMapping("/deduct_stock")
    public String deductStock() throws InterruptedException {

        String lockKey = "protuct_001";

        RLock redisLock = redisson.getLock(lockKey);

        try {
            //設置鎖
            redisLock.lock(30, TimeUnit.SECONDS);

            int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock")); //相當於 jedis.get("stock")
            if (stock > 0) {
                int realStock = stock - 1;
                stringRedisTemplate.opsForValue().set("stock",realStock + ""); //相當於 jedis.set(key,value)
                System.out.println("扣減成功,剩余庫存:" + realStock + "");
            } else {
                System.out.println("扣減失敗,庫存不足");
                    }

        } finally {
            //釋放鎖
            redisLock.unlock();

        }

        return "end";
    }
}

但是,當Redis使用主從架構的時候,Master Redis壞掉,Slave Redis會變成新的Master Redis。那么當線程A剛給主節點上完鎖,主節點就掛掉了,從節點變成主節點,並且原先主節點的鎖還沒有同步到從節點里面,線程B來的時候訪問新的主節點(原來的從節點),這個時候線程A還在執行,B也在執行。這個時候可以用RedLock來解決。

參考鏈接:

https://www.zhihu.com/question/294599028
https://zhuanlan.zhihu.com/p/42056183
https://www.bilibili.com/video/av62657941?p=12


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM