使用redis做分布式鎖


為什么會有這個需求:

例如一個簡單用戶的操作,一個線程去修改用戶狀態,首先在在內存中讀出用戶的狀態,然后在內存中進行修改,然后在存到數據庫中。在單線程中,這是沒有問題的。但是在多線程中由於讀取,修改,寫入是三個操作,不是原子操作(同時成功或失敗),因此在多線程中會存在數據的安全性問題。

這個問題的話,就可以用分布式鎖在限制程序的並發執行。

實現思路:

就是進來一個先占位,當別的線程進來操作的時候,發現有人占位了,就會放棄或者稍后再試。

占位的實現:

在redis中的setnx命令來實現,redis命令可以參考我這篇博客https://www.cnblogs.com/javazl/p/12657280.html,默認set命令就是存值,當key存在的時候,set就會覆蓋key的value值,而setnx則不會。當沒有key的時候,setnx就會進來先占位,當key存在了,其他的setnx就進不來了。。等到第一個執行完成后,在del命令釋放位子。

代碼實現:

public class LockTest {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis->{
            Long setnx = jedis.setnx("k1", "v1");
          //setnx的返回值為long類型
            if (setnx == 1) {
                //沒人占位
                jedis.set("name", "zl");
                String name = jedis.get("name");
                System.out.println(name);
                //釋放資源
                 jedis.del("k1");
           }else{
                //有人占位,停止/暫緩 操作
           }
       });
   }
}

上邊代碼中,就是一個簡易的分布式鎖的實現,但是有一個問題。就是如果在占位后釋放前掛了。那么這個線程會一直釋放不了,也就是del命令沒有調用,后面的全部請求都阻塞到這里,鎖就變成了死鎖。因此這里需要去優化。

優化的方法就是加過期時間,確保鎖在一定時間后能夠釋放.

public class LockTest {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis->{
            Long setnx = jedis.setnx("k1", "v1");
            if (setnx == 1) {
                //給鎖添加一個過期時間,防止應用在運行過程中拋出異常導致鎖無法及時得到釋放
                jedis.expire("k1", 5);
                //沒人占位
                jedis.set("name", "zl");
                String name = jedis.get("name");
                System.out.println(name);
                jedis.del("k1");
           }else{
                //有人占位,停止/暫緩 操作
           }
       });
   }

這樣處理后,就可以保證鎖可以正常的釋放。但是會有一個新的問題,就是如果在取鎖和設置過期時間服務器掛掉了,因為取鎖,也就是setnx和設置過期時間是兩個操作,不具備原子性所以不可能同時完成。這個鎖就會被一直占用,無法得到釋放,成為死鎖。那么如何解決呢?

在redis2.8之后,setnx和expireke可以通過一個命令一起執行,讓兩個操作變成一個,就會解決這個問題。

優化實現:

public class LockTest {
    public static void main(String[] args) {
        Redis redis = new Redis();
        redis.execute(jedis->{
         //將兩個操作合並成一個,nx就是setnx,ex就是expire
            String set = jedis.set("k1", "v1", new SetParams().nx().ex(5));
          //操作結果為okhuo或者error
            if (set !=null && "OK".equals(set)) {
          //給鎖添加一個過期時間,防止應用在運行過程中拋出異常導致鎖無法及時得到釋放
                jedis.expire("k1", 5);
                //沒人占位
                jedis.set("name", "zl);
                String name = jedis.get("name");
                System.out.println(name);
            //釋放資源
                jedis.del("k1");
           }else{
                //有人占位,停止/暫緩 操作
           }
       });
   }
}

用過期時間優化后,雖然解決了死鎖的問題,但是又有一個新的問題產生,就是超時問題:

舉個例子:如果要執行的業務很耗時,可能會出現紊亂,當地一個線程獲取到鎖的時候,開始執行業務代碼,但是業務代碼很耗時,假如過期時間是3秒,而業務執行需要5秒,這樣,鎖就會提前釋放,然后第二個線程獲取到鎖並開始執行。當執行到第2秒的時候,第一個鎖也執行完了,此時第一個線程會釋放第二個線程的鎖,然后第三個線程繼續獲取鎖並執行,當到第3秒的時候第二個線程執行完了,那么又會提前釋放鎖,一直如此循環,會造成線程的紊亂。

那么解決的思路主要有兩種

  1. 盡量避免耗時操作。
  2. 去處理鎖,給鎖的value設置隨機數或隨機字符串,每當要釋放的時候去判斷這個value的值,如果是的話就去釋放,如果不是就不釋放,舉個例子,假設第一個線程進來,它獲取鎖的value是1,如果發生超時就會進入下一個線程,下一個線程會獲取新的value為3,在釋放第二個所之前先去獲取value並比較,發現1不等於三,那么就不去釋放鎖。

第一種的話沒啥說的,但是第二種的話會有一個問題,就是釋放鎖會查看value,然后比較,然后釋放,會有三個操作,那么就不具備原子性,這樣操作的話,會出現死鎖。這里我們可以使用Lua腳本去處理。

Lua腳本的特點:

 1.使用方便,redis內置了對Lua腳本的支持。

 2.Lua可以在redis服務端原子性的執行多個redis命令

 3.由於網絡的原因會影響到redis的性能,因此,使用Lua可以讓多個命令同時執行,降低了網絡給redis帶來的性能問題。

在redis中如何使用Lua腳本:

1.在redis服務端寫好,然后在java業務中調用腳本

2.可以直接在java中直接去寫,寫好后,需要執行時,每次將腳本發送到redis中去執行。

創建Lua腳本:

//用redis.call調用一個redis命令,調的是get命令,這個key是從外面傳進來的key
if
redis.call("get",KEYS[1])==ARGV[1] then
//如果相等就去操作釋放命令  
return redis.call("del",KEYS[1]) else
 return 0 end

 可以給Lua腳本求一個SHA1和:

cat lua/equal.lua | redis-cli -a root script load --pipe

script load這個命令會在Redis中緩存Lua腳本,並返回腳本內容的SHA1校驗和,然后在java中調用時,傳入SHA1校驗和作為參數,這樣redis服務端就知道執行那個腳本了。

接下來在java中編寫

 public static void main(String[] args) {
        Redis redis = new Redis();
        for (int i = 0; i < 2; i++) {
            redis.execute(jedis -> {
                //1.先獲取一個隨機字符串
                String value = UUID.randomUUID().toString();
                //2.獲取鎖
                String k1 = jedis.set("k1", value, new SetParams().nx().ex(5));
                //3.判斷是否成功拿到鎖
                if (k1 != null && "OK".equals(k1)) {
                    //4. 具體的業務操作
                    jedis.set("site", "zl");
                    String site = jedis.get("site");
                    System.out.println(site);
                    //5.釋放鎖
                    jedis.evalsha("b8059ba43af6ffe8bed3db65bac35d452f8115d8", 
Arrays.asList("k1"), Arrays.asList(value));
               } else {
                    System.out.println("沒拿到鎖");
               }
           });
       }
   }
}

這樣處理的話,就解決了死鎖的問題。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM