Redis之分布式鎖的使用


一、分布式鎖

  分布式鎖一般有三種實現方式:1. 數據庫樂觀鎖;2. 基於Redis的分布式鎖;3. 基於ZooKeeper的分布式鎖。本篇博客將介紹第二種方式,基於Redis實現分布式鎖。

二、分布式鎖的演進

業務:電商網站賣東西需要去減庫存,本篇文章假設下的訂單數量都為1;

第1版的代碼

@Service
public class RedisLockDemo {

    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
        //獲取redis中的庫存
        int stock = Integer.valueOf(valueOperations.get("stock"));
        if(stock > 0) {
            int newStock = stock - 1;
            valueOperations.set("stock", newStock + "");
            System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
        }
        else {
            System.out.println("庫存已經為0,不能繼續扣減");
        }
     return "success"; } }

以上代碼在高並發的場景下會產生超賣的問題,所以我們修改一下代碼(增加synchronized);

第2版代碼

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
        synchronized (this) {
            //獲取redis中的庫存
            int stock = Integer.valueOf(valueOperations.get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //減庫存
                valueOperations.set("stock", newStock + "");
                System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
            } else {
                System.out.println("庫存已經為0,不能繼續扣減");
            }
        }
     
return "success"; } }

以上代碼在服務為多實例的情況下,還是會出現超賣的問題,這個時候就要引入分布式鎖來解決了。

第3版代碼

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        String lockKey = "lockKey";

        //加鎖: setnx
        Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, "1");
        if(null == isSuccess || isSuccess) {
            System.out.println("服務器繁忙, 請稍后重試");
            return "error";
        }
        
        //------ 執行業務邏輯 ----start------
        int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
        if (stock > 0) {
            int newStock = stock - 1;
            //執行業務操作減庫存
            redisTemplate.opsForValue().set("stock", newStock + "");
            System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
        } else {
            System.out.println("庫存已經為0,不能繼續扣減");
        }
        //------ 執行業務邏輯 ----end------

        //釋放鎖
        redisTemplate.delete(lockKey);
        return "success";
    }
}

以上代碼的問題:

(1)若在執行業務邏輯的過程中出現了異常,則會造成鎖不會被釋放,使其他有關的線程全部阻塞住(死鎖);我們可以把鎖釋放操作放入到 finally 語句中來解決;

(2)若在執行業務邏輯的過程中服務給掛掉了,仍然會造成鎖不會被釋放,使其他有關的線程全部阻塞住(死鎖);我們可以給 redis 的 key 增加一個超時時間(超過指定的時間則會刪除key及其對應的數據),雖然在超時時間到達之前其他有關的線程會一直阻塞住,但是這個時間比較小,且可以解決死鎖的問題,所以這個解決方案也是可以接受的。代碼如下:

第4版代碼:

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        String lockKey = "lockKey";

        try {
            //加鎖: setnx,expire(10秒超時)
            Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
            if(null == isSuccess || isSuccess) {
                System.out.println("服務器繁忙, 請稍后重試");
                return "error";
            }

            //------ 執行業務邏輯 ----start------
            int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //執行業務操作減庫存
                redisTemplate.opsForValue().set("stock", newStock + "");
                System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
            } else {
                System.out.println("庫存已經為0,不能繼續扣減");
            }
            //------ 執行業務邏輯 ----end------
        } finally {
            //釋放鎖 redisTemplate.delete(lockKey);
        }
        return "success";
    }
}

以上代碼還是會出現問題:

  當線程1的業務執行到一半的時候,設置的鎖超時時間到了,則鎖的key會被刪除;線程2就加鎖成功了,線程2還在執行的時候,線程1的業務執行完了,線程1接着執行刪除鎖的操作,但是線程1刪除的鎖實際上是線程2加的鎖,導致鎖失效的問題。

方法一:可以使用 “只要自己加鎖,只能自己去釋放” 來解決這個問題(第5版代碼);

第5版代碼:

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    public String deduceStock() {
        String lockKey = "lockKey";

        String clientId = UUID.randomUUID().toString();
        try {
            //加鎖: setnx,expire Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
            if(null == isSuccess || isSuccess) {
                System.out.println("服務器繁忙, 請稍后重試");
                return "error";
            }

            //------ 執行業務邏輯 ----start------
            int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //執行業務操作減庫存
                redisTemplate.opsForValue().set("stock", newStock + "");
                System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
            } else {
                System.out.println("庫存已經為0,不能繼續扣減");
            }
            //------ 執行業務邏輯 ----end------
        } finally {
            if(clientId.equals(redisTemplate.opsForValue().get(lockKey))) { //釋放鎖 redisTemplate.delete(lockKey); }
        }
        return "success";
    }
}

以上代碼雖然解決了鎖被其他線程釋放的問題,但是還是會出現問題;當前線程的業務還沒有執行完,鎖的超時時間到了,這樣其他線程就可以去加鎖並執行業務邏輯了,這樣就有兩個線程都在執行了,有可能導致bug。

方法二:可以給鎖進行續命,每次鎖快超時的時候就給鎖重新在設置一個時間(引入另一個redis的java客戶端 Redisson

三、分布式鎖的Redisson實現

(1)引入maven坐標;

<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.13.4</version>
</dependency>

(2)增加配置文件,將Redisson注入到容器中;

@Configuration
public class RedissonConfig {

    @Bean
    public Redisson redisson() {
        Config config = new Config();
        //單機版
        //config.useSingleServer().setAddress("redis://192.168.1.1:8001").setDatabase(0);

        //集群版
        config.useClusterServers()
                .addNodeAddress("redis://192.168.1.1:8001")
                .addNodeAddress("redis://192.168.1.1:8002")
                .addNodeAddress("redis://192.168.1.2:8001")
                .addNodeAddress("redis://192.168.1.2:8002")
                .addNodeAddress("redis://192.168.1.3:8001")
                .addNodeAddress("redis://192.168.1.3:8002");
        return (Redisson) Redisson.create(config);
    }
}

(3)分布式鎖的實現

@Service
public class RedisLockDemo {
    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired private Redisson redisson;

    public String deduceStock() {
        String lockKey = "lockKey";
        RLock redissonLock = redisson.getLock(lockKey);

        try {
            //加鎖(超時默認30s), 實現鎖續命的功能(后台啟動一個timer, 默認每10s檢測一次是否持有鎖) redissonLock.lock();

            //------ 執行業務邏輯 ----start------
            int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
            if (stock > 0) {
                int newStock = stock - 1;
                //執行業務操作減庫存
                redisTemplate.opsForValue().set("stock", newStock + "");
                System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
            } else {
                System.out.println("庫存已經為0,不能繼續扣減");
            }
            //------ 執行業務邏輯 ----end------
        } finally {
            //解鎖 redissonLock.unlock();
        }
        return "success";
    }
}

Redisson的實現原理

RedissonLock的使用介紹 

Redisson的官網:https://redisson.org/

// 鎖默認有效時間30秒,每10秒去檢查並重新設置超時時間
void lock();  

// 超過鎖有效時間 leaseTime,就會釋放鎖
void lock(long leaseTime, TimeUnit unit);

// 嘗試獲取鎖;成功則返回true,失敗則返回false
boolean tryLock();

// 不會去啟動定時任務;在 time 時間內還沒有獲取到鎖,則返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;

// 不會去啟動定時任務;當 waitTime 的時間到了,還沒有獲取到鎖則返回false;若獲取到鎖了,鎖的有效時間設置為 leaseTime
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;

  

 

Jedis和Redisson的比較 

Jedis提供了比Redisson更豐富的操作;

Redisson底層多使用 lua 腳本實現,對原子性的操作封裝較好,尤其是在分布式鎖上的封裝;

 

Redis實現的分布式鎖還會出現一點問題:

線程1加了鎖去執行業務了,此時Redis的 master 掛掉了,還沒有將數據同步到 slave 上。因為集群會選舉一個新的 master 出來,但是新的 master 上並沒有這個鎖;線程2可以在新選舉產生的 master 上去加鎖,然后處理業務。

(1)針對以上問題,我們可以使用 zookeeper 去實現分布式鎖,因為它是強一致性的。但是zookeeper的性能是低於Redis,使用Redis是完全夠了。

(2)當然,對於以上的問題,我們也可以使用 RedLock 去解決Redis上的那個問題,RedLock 實現的原理:給多個Redis節點發送加鎖的消息,只有超過一半以上的節點加鎖成功才算加鎖成功。

但是不推薦使用RedLock,當前的 RedLock 是有bug的,它的實現原理和 zookeeper 是差不多的。

 

高並發的高性能的Redis

怎么在高並發的場景去實現一個高性能的分布式鎖呢?

電商網站在大促的時候並發量很大:

(1)若搶購不是同一個商品,則可以增加Redis集群的cluster來實現,因為不是同一個商品,所以通過計算 key 的hash會落到不同的 cluster上;

(2)若搶購的是同一個商品,則計算key的hash值會落同一個cluster上,所以加機器也是么有用的。

我們可以使用庫存分段鎖的方式去實現。

分段鎖

  假如產品1有200個庫存,我們可以將這200個庫存分為10個段存儲(每段20個),每段存儲到一個cluster上;將key使用hash計算,使這些key最后落在不同的cluster上。

  每個下單請求鎖了一個庫存分段,然后在業務邏輯里面,就對數據庫或者是Redis中的那個分段庫存進行操作即可,包括查庫存 -> 判斷庫存是否充足 -> 扣減庫存。

可以參照 ConcurrentHashMap 的源碼去實現,它使用的就是分段鎖。

高性能分布式鎖參考鏈接:https://blog.csdn.net/eluanshi12/article/details/84616173

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM