基於Redis實現分布式鎖


一、Redis實現分布式鎖基本原理

 主要就是redis的setnx(id,value)指令

在Redis中,有一個不常使用的命令如下所示。

SETNX key value

這條命令的含義就是“SET if Not Exists”,即不存在的時候才會設置值。

只有在key不存在的情況下,將鍵key的值設置為value。如果key已經存在,則SETNX命令不做任何操作。

這個命令的返回值如下。

  • 命令在設置成功時返回1。

  • 命令在設置失敗時返回0。

所以,我們在分布式高並發環境下,可以使用Redis的SETNX命令來實現分布式鎖。假設此時有線程A和線程B同時訪問臨界區代碼,假設線程A首先執行了SETNX命令,並返回結果1,繼續向下執行。而此時線程B再次執行SETNX命令時,返回的結果為0,則線程B不能繼續向下執行。只有當線程A執行DELETE命令將設置的鎖狀態刪除時,線程B才會成功執行SETNX命令設置加鎖狀態后繼續向下執行。

引入分布式鎖

了解了如何使用Redis中的命令實現分布式鎖后,我們就可以對下單接口進行改造了,加入分布式鎖,如下所示。

**
* 為了演示方便,我這里就簡單定義了一個常量作為商品的id
* 實際工作中,這個商品id是前端進行下單操作傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通過stringRedisTemplate來調用Redis的SETNX命令,key為商品的id,value為字符串“binghe”
    //實際上,value可以為任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
    if(stock > 0){
        stock -= 1;
        stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
        logger.debug("庫存扣減成功,當前庫存為:{}", stock);
    }else{
        logger.debug("庫存不足,扣減庫存失敗");
        throw new OrderException("庫存不足,扣減庫存失敗");
    }
    //業務執行完成,刪除PRODUCT_ID key
    stringRedisTemplate.delete(PRODUCT_ID);
    return "success";
}
View Code

那么,在上述代碼中,我們加入了分布式鎖的操作,那上述代碼是否能夠在高並發場景下保證業務的原子性呢?答案是可以保證業務的原子性。但是,在實際場景中,上面實現分布式鎖的代碼是不可用的!!

假設當線程A首先執行stringRedisTemplate.opsForValue()的setIfAbsent()方法返回true,繼續向下執行,正在執行業務代碼時,拋出了異常,線程A直接退出了JVM。此時,stringRedisTemplate.delete(PRODUCT_ID);代碼還沒來得及執行,之后所有的線程進入提交訂單的方法時,調用stringRedisTemplate.opsForValue()的setIfAbsent()方法都會返回false。導致后續的所有下單操作都會失敗。這就是分布式場景下的死鎖問題。

所以,上述代碼中實現分布式鎖的方式在實際場景下是不可取的!!

引入try-finally代碼塊

說到這,相信小伙伴們都能夠想到,使用try-finall代碼塊啊,接下來,我們為下單接口的方法加上try-finally代碼塊。

/**
* 為了演示方便,我這里就簡單定義了一個常量作為商品的id
* 實際工作中,這個商品id是前端進行下單操作傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通過stringRedisTemplate來調用Redis的SETNX命令,key為商品的id,value為字符串“binghe”
    //實際上,value可以為任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("庫存扣減成功,當前庫存為:{}", stock);
        }else{
            logger.debug("庫存不足,扣減庫存失敗");
            throw new OrderException("庫存不足,扣減庫存失敗");
        }
    }finally{
         //業務執行完成,刪除PRODUCT_ID key
        stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}
View Code

那么,上述代碼是否真正解決了死鎖的問題呢?我們在寫代碼時,不能只盯着代碼本身,覺得上述代碼沒啥問題了。實際上,生產環境是非常復雜的。如果線程在成功加鎖之后,執行業務代碼時,

還沒來得及執行刪除鎖標志的代碼,此時,服務器宕機了,程序並沒有優雅的退出JVM。也會使得后續的線程進入提交訂單的方法時,因無法成功的設置鎖標志位而下單失敗。所以說,上述的代碼仍然存在問題。

引入Redis超時機制

 在Redis中可以設置緩存的自動過期時間,我們可以將其引入到分布式鎖的實現中,如下代碼所示。

/**
* 為了演示方便,我這里就簡單定義了一個常量作為商品的id
* 實際工作中,這個商品id是前端進行下單操作傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通過stringRedisTemplate來調用Redis的SETNX命令,key為商品的id,value為字符串“binghe”
    //實際上,value可以為任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe");
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    try{
        stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS);
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("庫存扣減成功,當前庫存為:{}", stock);
        }else{
            logger.debug("庫存不足,扣減庫存失敗");
            throw new OrderException("庫存不足,扣減庫存失敗");
        }
    }finally{
         //業務執行完成,刪除PRODUCT_ID key
        stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}
View Code

 在上述代碼中,我們加入了如下一行代碼來為Redis中的鎖標志設置過期時間。

  stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS); 

此時,我們設置的過期時間為30秒。

那么問題來了,這樣是否就真正的解決了問題呢?上述程序就真的沒有坑了嗎?答案是還是有坑的!!!

“坑位”分析

我們在下單操作的方法中為分布式鎖引入了超時機制,此時的代碼還是無法真正避免死鎖的問題,那“坑位”到底在哪里呢?試想,當程序執行完stringRedisTemplate.opsForValue().setIfAbsent()方法后,正要執行stringRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS)代碼時,服務器宕機了,你還別說,生產壞境的情況非常復雜,就是這么巧,服務器就宕機了。此時,后續請求進入提交訂單的方法時,都會因為無法成功設置鎖標志而導致后續下單流程無法正常執行。

既然我們找到了上述代碼的“坑位”,那我們如何將這個”坑“填上?如何解決這個問題呢?別急,Redis已經提供了這樣的功能。我們可以在向Redis中保存數據的時候,可以同時指定數據的超時時間。所以,我們可以將代碼改造成如下所示。

/**
* 為了演示方便,我這里就簡單定義了一個常量作為商品的id
* 實際工作中,這個商品id是前端進行下單操作傳遞過來的參數
*/
public static final String PRODUCT_ID = "100001";

@RequestMapping("/submitOrder")
public String submitOrder(){
    //通過stringRedisTemplate來調用Redis的SETNX命令,key為商品的id,value為字符串“binghe”
    //實際上,value可以為任意的字符換
    Boolean isLocked = stringRedisTemplate.opsForValue().setIfAbsent(PRODUCT_ID, "binghe", 30, TimeUnit.SECONDS);
   //沒有拿到鎖,返回下單失敗
    if(!isLock){
        return "failure";
    }
    try{
        int stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
        if(stock > 0){
            stock -= 1;
            stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
            logger.debug("庫存扣減成功,當前庫存為:{}", stock);
        }else{
            logger.debug("庫存不足,扣減庫存失敗");
            throw new OrderException("庫存不足,扣減庫存失敗");
        }
    }finally{
         //業務執行完成,刪除PRODUCT_ID key
        stringRedisTemplate.delete(PRODUCT_ID);
    }
    return "success";
}
View Code

在上述代碼中,我們在向Redis中設置鎖標志位的時候就設置了超時時間。此時,只要向Redis中成功設置了數據,則即使我們的業務系統宕機,Redis中的數據過期后,也會自動刪除。后續的線程進入提交訂單的方法后,就會成功的設置鎖標志位,並向下執行正常的下單流程。

到此,上述的代碼基本上在功能角度解決了程序的死鎖問題,那么,上述程序真的就完美了嗎?哈哈,很多小伙伴肯定會說不完美!確實,上面的代碼還不是完美的,那大家知道哪里不完美嗎?接下來,我們繼續分析。

在開發集成角度分析代碼

在我們開發公共的系統組件時,比如我們這里說的分布式鎖,我們肯定會抽取一些公共的類來完成相應的功能來供系統使用。

這里,假設我們定義了一個RedisLock接口,如下所示。

public interface RedisLock{
    //加鎖操作
    boolean tryLock(String key, long timeout, TimeUnit unit);
    //解鎖操作
    void releaseLock(String key);
}
View Code

 接下來,使用RedisLockImpl類實現RedisLock接口,提供具體的加鎖和解鎖實現,如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        return stringRedisTemplate.opsForValue().setIfAbsent(key, "binghe", timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        stringRedisTemplate.delete(key);
    }
}
View Code

在開發集成的角度來說,當一個線程從上到下執行時,首先對程序進行加鎖操作,然后執行業務代碼,執行完成后,再進行釋放鎖的操作。理論上,加鎖和釋放鎖時,操作的Redis Key都是一樣的。但是,如果其他開發人員在編寫代碼時,並沒有調用tryLock()方法,而是直接調用了releaseLock()方法,並且他調用releaseLock()方法傳遞的key與你調用tryLock()方法傳遞的key是一樣的。那此時就會出現問題了,他在編寫代碼時,硬生生的將你加的鎖釋放了!!!

所以,上述代碼是不安全的,別人能夠隨隨便便的將你加的鎖刪除,這就是鎖的誤刪操作,這是非常危險的,所以,上述的程序存在很嚴重的問題!!

那如何實現只有加鎖的線程才能進行相應的解鎖操作呢? 繼續向下看。

如何實現加鎖和解鎖的歸一化?

什么是加鎖和解鎖的歸一化呢?簡單點來說,就是一個線程執行了加鎖操作后,后續必須由這個線程執行解鎖操作,加鎖和解鎖操作由同一個線程來完成。

為了解決只有加鎖的線程才能進行相應的解鎖操作的問題,那么,我們就需要將加鎖和解鎖操作綁定到同一個線程中,那么,如何將加鎖操作和解鎖操作綁定到同一個線程呢?其實很簡單,相信很多小伙伴都想到了—— 使用ThreadLocal實現 。沒錯,使用ThreadLocal類確實能夠解決這個問題。

此時,我們將RedisLockImpl類的代碼修改成如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();

    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        String uuid = UUID.randomUUID().toString();
        threadLocal.set(uuid);
        return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
             stringRedisTemplate.delete(key);   
        }
    }
}
View Code

 繼續分析

我們將加鎖和解鎖的方法改成如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;
    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();
    private String lockUUID;
    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        String uuid = UUID.randomUUID().toString();
        threadLocal.set(uuid);
        lockUUID = uuid;
        return stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(lockUUID.equals(stringRedisTemplate.opsForValue().get(key))){
             stringRedisTemplate.delete(key);   
        }
    }
}
View Code

相信很多小伙伴都會看出上述代碼存在什么問題了!!沒錯,那就是 線程安全的問題。

所以,這里,我們需要使用ThreadLocal來解決線程安全問題。

可重入性分析

在上面的代碼中,當一個線程成功設置了鎖標志位后,其他的線程再設置鎖標志位時,就會返回失敗。還有一種場景就是在提交訂單的接口方法中,調用了服務A,服務A調用了服務B,而服務B的方法中存在對同一個商品的加鎖和解鎖操作。

所以,服務B成功設置鎖標志位后,提交訂單的接口方法繼續執行時,也不能成功設置鎖標志位了。也就是說,目前實現的分布式鎖沒有可重入性。

這里,就存在可重入性的問題了。我們希望設計的分布式鎖 具有可重入性 ,那什么是可重入性呢?簡單點來說,就是同一個線程,能夠多次獲取同一把鎖,並且能夠按照順序進行解決操作。

其實,在JDK 1.5之后提供的鎖很多都支持可重入性,比如synchronized和Lock。

如何實現可重入性呢?

映射到我們加鎖和解鎖方法時,我們如何支持同一個線程能夠多次獲取到鎖(設置鎖標志位)呢?可以這樣簡單的設計:如果當前線程沒有綁定uuid,則生成uuid綁定到當前線程,並且在Redis中設置鎖標志位。如果當前線程已經綁定了uuid,則直接返回true,證明當前線程之前已經設置了鎖標志位,也就是說已經獲取到了鎖,直接返回true。

結合以上分析,我們將提交訂單的接口方法代碼改造成如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();

    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
            threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }else{
            isLocked = true;   
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
             stringRedisTemplate.delete(key);   
        }
    }
}
View Code

 這樣寫看似沒有啥問題,但是大家細想一下,這樣寫就真的OK了嗎?

可重入性的問題分析

既然上面分布式鎖的可重入性是存在問題的,那我們就來分析下問題的根源在哪里!

假設我們提交訂單的方法中,首先使用RedisLock接口對代碼塊添加了分布式鎖,在加鎖后的代碼中調用了服務A,而服務A中也存在調用RedisLock接口的加鎖和解鎖操作。而多次調用RedisLock接口的加鎖操作時,只要之前的鎖沒有失效,則會直接返回true,表示成功獲取鎖。也就是說,無論調用加鎖操作多少次,最終只會成功加鎖一次。而執行完服務A中的邏輯后,在服務A中調用RedisLock接口的解鎖方法,此時,會將當前線程所有的加鎖操作獲得的鎖全部釋放掉。

我們可以使用下圖來簡單的表示這個過程。

 

 

那么問題來了,如何解決可重入性的問題呢?

解決可重入性問題

相信很多小伙伴都能夠想出使用計數器的方式來解決上面可重入性的問題,沒錯,就是使用計數器來解決。 整體流程如下所示。

 

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();

    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();

    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
            threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
        }else{
            isLocked = true;   
        }
        //加鎖成功后將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //計數器減為0時釋放鎖
            if(count == null || --count <= 0){
                 stringRedisTemplate.delete(key);      
            }
        }
    }
}
View Code

至此,我們基本上解決了分布式鎖的可重入性問題。

說到這里,我還要問大家一句,上面的解決問題的方案真的沒問題了嗎?

阻塞與非阻塞鎖

在提交訂單的方法中,當獲取Redis分布式鎖失敗時,我們直接返回了failure來表示當前請求下單的操作失敗了。試想,在高並發環境下,一旦某個請求獲得了分布式鎖,那么,在這個請求釋放鎖之前,其他的請求調用下單方法時,都會返回下單失敗的信息。在真實場景中,這是非常不友好的。我們可以將后續的請求進行阻塞,直到當前請求釋放鎖后,再喚醒阻塞的請求獲得分布式鎖來執行方法。

所以,我們設計的分布式鎖需要支持 阻塞和非阻塞 的特性。

那么,如何實現阻塞呢?我們可以使用自旋來實現,繼續修改RedisLockImpl的代碼如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();

    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();

    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
            threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //如果獲取鎖失敗,則自旋獲取鎖,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
        }else{
            isLocked = true;   
        }
        //加鎖成功后將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //計數器減為0時釋放鎖
            if(count == null || --count <= 0){
                 stringRedisTemplate.delete(key);      
            }
        }
    }
}
View Code

在分布式鎖的設計中,阻塞鎖和非阻塞鎖 是非常重要的概念,大家一定要記住這個知識點。

鎖失效問題

盡管我們實現了分布式鎖的阻塞特性,但是還有一個問題是我們不得不考慮的。那就是 鎖失效的問題。

當程序執行業務的時間超過了鎖的過期時間會發生什么呢? 想必很多小伙伴都能夠想到,那就是前面的請求沒執行完,鎖過期失效了,后面的請求獲取到分布式鎖,繼續向下執行了,程序無法做到真正的互斥,無法保證業務的原子性了。

那如何解決這個問題呢?答案就是:我們必須保證在業務代碼執行完畢后,才能釋放分布式鎖。方案是有了,那如何實現呢?

說白了,我們需要在業務代碼中,時不時的執行下面的代碼來保證在業務代碼沒執行完時,分布式鎖不會因超時而被釋放。

  springRedisTemplate.expire(PRODUCT_ID, 30, TimeUnit.SECONDS); 

這里,我們需要定義一個定時策略來執行上面的代碼,需要注意的是:我們不能等到30秒后再執行上述代碼,因為30秒時,鎖已經失效了。例如,我們可以每10秒執行一次上面的代碼。

有些小伙伴說,直接在RedisLockImpl類中添加一個while(true)循環來解決這個問題,那我們就這樣修改下RedisLockImpl類的代碼,看看有沒有啥問題。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();

    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();

    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
            threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //如果獲取鎖失敗,則自旋獲取鎖,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
            //定義更新鎖的過期時間
            while(true){
                Integer count = threadLocalInteger.get();
                //當前鎖已經被釋放,則退出循環
                if(count == 0 || count <= 0){
                    break;
                }
                springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
                try{
                    //每隔10秒執行一次
                    Thread.sleep(10000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        }else{
            isLocked = true;   
        }
        //加鎖成功后將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        if(threadLocal.get().equals(stringRedisTemplate.opsForValue().get(key))){
            Integer count = threadLocalInteger.get();
            //計數器減為0時釋放鎖
            if(count == null || --count <= 0){
                 stringRedisTemplate.delete(key);      
            }
        }
    }
}
View Code

相信小伙伴們看了代碼就會發現哪里有問題了:更新鎖過期時間的代碼肯定不能這么去寫。因為這么寫會 導致當前線程在更新鎖超時時間的while(true)循環中一直阻塞而無法返回結果。 所以,我們不能將當前線程阻塞,需要異步執行定時任務來更新鎖的過期時間。

此時,我們繼續修改RedisLockImpl類的代碼,將定時更新鎖超時的代碼放到一個單獨的線程中執行,如下所示。

public class RedisLockImpl implements RedisLock{
    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    private ThreadLocal<String> threadLocal = new ThreadLocal<String>();

    private ThreadLocal<Integer> threadLocalInteger = new ThreadLocal<Integer>();

    @Override
    public boolean tryLock(String key, long timeout, TimeUnit unit){
        Boolean isLocked = false;
        if(threadLocal.get() == null){
            String uuid = UUID.randomUUID().toString();
            threadLocal.set(uuid);
            isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
            //如果獲取鎖失敗,則自旋獲取鎖,直到成功
            if(!isLocked){
                for(;;){
                    isLocked = stringRedisTemplate.opsForValue().setIfAbsent(key, uuid, timeout, unit);
                    if(isLocked){
                        break;
                    }
                }
            }
            //啟動新線程來執行定時任務,更新鎖過期時間
           new Thread(new UpdateLockTimeoutTask(uuid, stringRedisTemplate)).start();
        }else{
            isLocked = true;   
        }
        //加鎖成功后將計數器加1
        if(isLocked){
            Integer count = threadLocalInteger.get() == null ? 0 : threadLocalInteger.get();
            threadLocalInteger.set(count++);
        }
        return isLocked;
    }
    @Override
    public void releaseLock(String key){
        //當前線程中綁定的uuid與Redis中的uuid相同時,再執行刪除鎖的操作
        String uuid = stringRedisTemplate.opsForValue().get(key);
        if(threadLocal.get().equals(uuid)){
            Integer count = threadLocalInteger.get();
            //計數器減為0時釋放鎖
            if(count == null || --count <= 0){
                 stringRedisTemplate.delete(key); 
                //獲取更新鎖超時時間的線程並中斷
                long threadId = stringRedisTemplate.opsForValue().get(uuid);
                Thread updateLockTimeoutThread = ThreadUtils.getThreadByThreadId(threadId);
                if(updateLockTimeoutThread != null){
                     //中斷更新鎖超時時間的線程
                    updateLockTimeoutThread.interrupt();   
                    stringRedisTemplate.delete(uuid);
                }
            }
        }
    }
}
View Code

 創建UpdateLockTimeoutTask類來執行更新鎖超時的時間。

public class UpdateLockTimeoutTask implements Runnable{
    //uuid
    private long uuid;
    private StringRedisTemplate stringRedisTemplate;
    public UpdateLockTimeoutTask(long uuid, StringRedisTemplate stringRedisTemplate){
        this.uuid = uuid;
        this.stringRedisTemplate = stringRedisTemplate;
    }
    @Override
    public void run(){
        //以uuid為key,當前線程id為value保存到Redis中
        stringRedisTemplate.opsForValue().set(uuid, Thread.currentThread().getId());
         //定義更新鎖的過期時間
        while(true){
            springRedisTemplate.expire(key, 30, TimeUnit.SECONDS);
            try{
                //每隔10秒執行一次
                Thread.sleep(10000);
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }
}
View Code

 接下來,我們定義一個ThreadUtils工具類,這個工具類中有一個根據線程id獲取線程的方法getThreadByThreadId(long threadId)。

public class ThreadUtils{
    //根據線程id獲取線程句柄
    public static Thread getThreadByThreadId(long threadId){
        ThreadGroup group = Thread.currentThread().getThreadGroup();
        while(group != null){
            Thread[] threads = new Thread[(int)(group.activeCount() * 1.2)];
            int count = group.enumerate(threads, true);
            for(int i = 0; i < count; i++){
                if(threadId == threads[i].getId()){
                    return threads[i];
                }
            }
        }
    }
}
View Code

上述解決分布式鎖失效的問題在分布式鎖領域有一個專業的術語叫做 “異步續命” 。需要注意的是:當業務代碼執行完畢后,我們需要停止更新鎖超時時間的線程。所以,這里,我對程序的改動是比較大的,首先,將更新鎖超時的時間任務重新定義為一個UpdateLockTimeoutTask類,並將uuid和StringRedisTemplate注入到任務類中,在執行定時更新鎖超時時間時,首先將當前線程保存到Redis中,其中Key為傳遞進來的uuid。

在首先獲取分布式鎖后,重新啟動線程,並將uuid和StringRedisTemplate傳遞到任務類中執行任務。當業務代碼執行完畢后,調用releaseLock()方法釋放鎖時,我們會通過uuid從Redis中獲取更新鎖超時時間的線程id,並通過線程id獲取到更新鎖超時時間的線程,調用線程的interrupt()方法來中斷線程。

此時,當分布式鎖釋放后,更新鎖超時的線程就會由於線程中斷而退出了。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM