基於Redis的分布式鎖設計


前言

基於Redis的分布式鎖實現,原理很簡單嘛:檢測一下Key是否存在,不存在則Set Key,加鎖成功,存在則加鎖失敗。對嗎?這么簡單嗎?

如果你真這么想,那么你真的需要好好聽我講一下了。接下來,咱們找個例子研究一下。

在開始之前,咱們先定些規則:

  • 關於示例代碼:
    • 需要搭配我准備的示例代碼,該示例采用C#編寫
    • 示例中的材料Id固定為10000
    • 示例中的材料初始庫存均為100
  • 關於Redis中的Key:
    • 指示材料庫存的Key為ProductStock_10000
    • 自己實現的分布式鎖中,指示鎖的Key為DistributedLock_10000
    • RedLock.net中,指示鎖的Key為redlock:10000

假如沒有鎖

如果沒有鎖,我們可以通過Jmeter並發100個請求,看看最后庫存是不是0

/// <summary>
/// 無鎖扣減庫存
/// </summary>
/// <returns></returns>
[HttpPost("DecreaseProductStockWithNoLock")]
public async Task<string> DecreaseProductStockWithNoLock()
{
    var stockKey = GetProductStockKey(ProductId);
    var currentQuantity = (long)(await _redisDatabase.Database.StringGetAsync(stockKey));
    if (currentQuantity < 1)
        throw new Exception("庫存不足");

    var leftQuantity = currentQuantity - 1;
    await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

    return $"剩余庫存:{leftQuantity}";
}

完了,庫存全亂了,收拾收拾,跑路吧o(╥﹏╥)o!

單應用中的鎖

提到鎖,大多數人首先想到的應該就是Monitor的語法糖lock了,這是大多數人最先接觸到的一種鎖。在單應用中,因為lock是線程鎖,所以使用該鎖一般是沒有什么問題的。

/// <summary>
/// 在單應用中扣減庫存
/// </summary>
/// <returns></returns>
[HttpPost("DecreaseProductStockInSingleApp")]
public string DecreaseProductStockInSingleApp()
{
    long leftQuantity;
    lock (_lockObj)
    {
        var stockKey = GetProductStockKey(ProductId);
        var currentQuantity = (long)_redisDatabase.Database.StringGet(stockKey);
        if (currentQuantity < 1)
            throw new Exception("庫存不足");

        leftQuantity = currentQuantity - 1;
        _redisDatabase.Database.StringSet(stockKey, leftQuantity);
    }

    return $"剩余庫存:{leftQuantity}";
}

結果和我們所期望的一樣,剩余庫存為0

但是如果我們進行應用集群,部署多份一模一樣的應用,那lock就無能為力了。接下來,咱們啟動兩個應用實例來看看

# 以開發環境運行,能看到更多信息
dotnet XXTk.Redis.DistributedLock.Api.dll --urls http://localhost:5000 --environment Development

dotnet XXTk.Redis.DistributedLock.Api.dll --urls http://localhost:5010 --environment Development

可見,一共發送了100個請求,本應該最后庫存為0的,卻還剩17個

應用集群中的鎖

版本1

很明顯,lock已經沒用了,是時候進入咱們的主題了——基於Redis的分布式鎖設計。

初步的思路是這樣的:

  1. 將材料Id作為Redis Key
  2. 如果Redis中存在該Key,則認為鎖已經被其他線程占用了
  3. 如果Redis中不存在Key,則將該Key添加到Redis中,Value則隨意賦值
  4. 當獲取到鎖的業務執行完畢后,將Key從Redis中移除

有了思路,接下來就該想一下如何實現了。很幸運,Redis的命令SETNX key value完全滿足我們的需求,實現如下:

對Redis命令不熟悉的同學,可以參考這篇Redis命令文檔

/// <summary>
/// 在應用集群中扣減庫存V1
/// </summary>
/// <returns></returns>
[HttpPost("v1/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV1()
{
    var lockKey = GetDistributedLockKey(ProductId.ToString());

    // 使用 SETNX key value 命令加鎖
    if (await _redisDatabase.Database.StringSetAsync(lockKey, 1, null, When.NotExists, CommandFlags.DemandMaster))
    {
        try
        {
            var stockKey = GetProductStockKey(ProductId);
            var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
            if (currentQuantity < 1)
                throw new Exception("庫存不足");

            var leftQuantity = currentQuantity - 1;
            await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

            return $"剩余庫存:{leftQuantity}";
        }
        finally
        {
            // 釋放鎖
            await _redisDatabase.Database.KeyDeleteAsync(lockKey, CommandFlags.DemandMaster);
        }
    }
    else
        throw new Exception("獲取鎖失敗");
}

我沒找到Jmeter統計請求成功或失敗次數的方法,所以使用了聚合報告,通過報告里的錯誤率手動計算。如果你知道,可以分享給我,謝謝!

通過計算,成功50次,失敗50次,而我們查到的庫存也是還剩余50個,所以已經基本實現了我們的需求。

版本2

雖然版本1已經基本實現了我們的需求,但是試想一下:

  • 代碼執行在try塊中時,應用崩潰了,導致鎖未被釋放
  • 釋放鎖時,由於網絡問題,連接Redis失敗了,導致鎖未被釋放

如果發生了以上任何情況,都無法正確的釋放鎖,導致鎖永遠無法釋放,導致死鎖。

那我們應該怎么辦呢?對,就是給鎖加一個過期時間!不過SETNX命令並沒有“過期時間”參數,那我們就需要在獲取到鎖后,通過EXPIRE命令設置鎖的過期時間。

這樣,可以嗎?當然不可以,我們需要將SETEXPIRE兩個操作合並為一個原子性操作,那我們應該怎么做呢?別擔心,Redis對SET命令進行了增強,使用SET key value EX seconds NX命令即可,最后的NX則是表示與SETNX同義。

/// <summary>
/// 在應用集群中扣減庫存V2
/// </summary>
/// <returns></returns>
[HttpPost("v2/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV2()
{
    var lockKey = GetDistributedLockKey(ProductId.ToString());
    var expiresIn = TimeSpan.FromSeconds(30);

    // 使用 SET key value EX seconds NX 命令加鎖,並設置過期時間
    if (await _redisDatabase.AddAsync(lockKey, 1, expiresIn, When.NotExists, CommandFlags.DemandMaster))
    {
        try
        {
            var stockKey = GetProductStockKey(ProductId);
            var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
            if (currentQuantity < 1)
                throw new Exception("庫存不足");

            var leftQuantity = currentQuantity - 1;
            await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

            return $"剩余庫存:{leftQuantity}";
        }
        finally
        {
            // 釋放鎖
            await _redisDatabase.Database.KeyDeleteAsync(lockKey, CommandFlags.DemandMaster);
        }
    }
    else
        throw new Exception("獲取鎖失敗");
}

版本3

好,死鎖的問題咱們已經解決了,那咱們的分布式鎖是不是已經完美了呢?NO!NO!NO!還是有一些問題滴:

  1. 如果線程A獲取到了鎖,並設置了鎖的過期時間是30s,而業務的執行時長需要40s,這就出現了鎖被提前釋放的問題
  2. 如果鎖被提前釋放了,然后被另一個線程B獲取到了,此時線程A的業務執行完畢了,然后執行了finally代碼塊中的鎖釋放代碼,這就把不屬於線程A而屬於線程B的鎖釋放掉了,這下可全亂套了。

是不是感覺越改問題越多?別灰心,咱們一個一個來解決,先來解決第二個“錯誤釋放了不屬於自己的鎖”的問題。為了讓線程知道哪個是自己的鎖,我們需要給線程起個唯一不重復的名字,當需要釋放鎖的時候,先檢查一下是不是自己的鎖,如果是,才釋放鎖。那這個名字放在哪里呢?咱們之前LockKey對應的Value不是沒有用嘛,那咱們就把名字存這里面,實現如下:

/// <summary>
/// 在應用集群中扣減庫存V3
/// </summary>
/// <returns></returns>
[HttpPost("v3/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV3()
{
    var lockKey = GetDistributedLockKey(ProductId.ToString());
    var resourceId = Guid.NewGuid().ToString();
    var expiresIn = TimeSpan.FromSeconds(30);

    // 使用 SET key value EX seconds NX 命令加鎖,設置過期時間,並將值設置為業務Id
    if (await _redisDatabase.AddAsync(lockKey, resourceId, expiresIn, When.NotExists, CommandFlags.DemandMaster))
    {
        try
        {
            var stockKey = GetProductStockKey(ProductId);
            var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
            if (currentQuantity < 1)
                throw new Exception("庫存不足");

            var leftQuantity = currentQuantity - 1;
            await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

            return $"剩余庫存:{leftQuantity}";
        }
        finally
        {
            // 釋放鎖
            if (await _redisDatabase.GetAsync<string>(lockKey) == resourceId)
            {
                _redisDatabase.Database.KeyDelete(lockKey, CommandFlags.DemandMaster);
            }
        }
    }
    else
        throw new Exception("獲取鎖失敗");
}

版本4

上面的代碼,你應該看出問題了吧?沒錯,最后的釋放鎖代碼是分兩步執行的,並不是原子操作,這肯定是不允許的啦!但是,Redis又沒有提供相關命令,所以我們只能使用lua腳本了:

/// <summary>
/// 在應用集群中扣減庫存V4
/// </summary>
/// <returns></returns>
[HttpPost("v4/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV4()
{
    var lockKey = GetDistributedLockKey(ProductId.ToString());
    var resourceId = Guid.NewGuid().ToString();
    var expiresIn = TimeSpan.FromSeconds(30);

    // 使用 SET key value EX seconds NX 命令加鎖,設置過期時間,並將值設置為業務Id
    if (await _redisDatabase.AddAsync(lockKey, resourceId, expiresIn, When.NotExists, CommandFlags.DemandMaster))
    {
        try
        {
            var stockKey = GetProductStockKey(ProductId);
            var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
            if (currentQuantity < 1)
                throw new Exception("庫存不足");

            var leftQuantity = currentQuantity - 1;
            await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

            return $"剩余庫存:{leftQuantity}";
        }
        finally
        {
            // 釋放鎖,使用lua腳本實現操作的原子性
            await _redisDatabase.Database.ScriptEvaluateAsync(@"
                if redis.call('get', KEYS[1]) == ARGV[1] then
                    return redis.call('del', KEYS[1])
                else
                    return 0
                end",
             keys: new RedisKey[] { lockKey },
             values: new RedisValue[] { resourceId }, 
             CommandFlags.DemandMaster);
        }
    }
    else
        throw new Exception("獲取鎖失敗");
}

如果你沒有使用我的示例代碼,而是自己寫的,可能會出現鎖未被正確釋放的問題:執行完lua腳本后,返回的是0。這可能是因為你使用了Json序列化工具來將對象序列化為字符串,以將其存放到Redis中。但是由於Json序列化字符串時,將引號(")也序列化為了("),這就會導致字符串"123"存入到Redis中為"\"123\""。具體解決辦法可以參考我實現的RedisNewtonsoftSerializer類。

版本5

最后,我們來解決最后一個問題——業務執行時長超過了鎖的過期時長,導致鎖提前被釋放。由於我們無法准確預測業務的執行時長,鎖過期時間設置的太長也不合理,所以,若業務還未執行完,我們必須能夠在鎖快過期的時候,適當的延長鎖過期時間。可以通過定時器來解決。

/// <summary>
/// 在應用集群中扣減庫存V5
/// </summary>
/// <returns></returns>
[HttpPost("v5/DecreaseProductStockInAppCluster")]
public async Task<string> DecreaseProductStockInAppClusterV5()
{
    var lockKey = GetDistributedLockKey(ProductId.ToString());
    var resourceId = Guid.NewGuid().ToString();
    var expiresIn = TimeSpan.FromSeconds(30);

    // 使用 SET key value EX seconds NX 命令加鎖,設置過期時間,並將值設置為業務Id
    if (await _redisDatabase.AddAsync(lockKey, resourceId, expiresIn, When.NotExists, CommandFlags.DemandMaster))
    {
        try
        {
            // 啟動定時器,定時延長key的過期時間
            var interval = expiresIn.TotalMilliseconds / 2;
            var timer = new System.Threading.Timer(
                callback: state => ExtendLockLifetime(lockKey, resourceId, expiresIn),
                state: null,
                dueTime: (int)interval,
                period: (int)interval);

            var stockKey = GetProductStockKey(ProductId);
            var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
            if (currentQuantity < 1)
                throw new Exception("庫存不足");

            var leftQuantity = currentQuantity - 1;
            await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

            timer.Change(Timeout.Infinite, Timeout.Infinite);
            timer.Dispose();
            timer = null;

            return $"剩余庫存:{leftQuantity}";
        }
        finally
        {
            // 釋放鎖,使用lua腳本實現操作的原子性
            await _redisDatabase.Database.ScriptEvaluateAsync(@"
                if redis.call('get', KEYS[1]) == ARGV[1] then
                    return redis.call('del', KEYS[1])
                else
                    return 0
                end",
             keys: new RedisKey[] { lockKey },
             values: new RedisValue[] { resourceId },
             CommandFlags.DemandMaster);
        }
    }
    else
        throw new Exception("獲取鎖失敗");
}

private void ExtendLockLifetime(string lockKey, string resourceId, TimeSpan expiresIn)
{
    _redisDatabase.Database.ScriptEvaluate(@"
        local currentVal = redis.call('get', KEYS[1])
        if (currentVal == false) then
            return redis.call('set', KEYS[1], ARGV[1], 'PX', ARGV[2]) and 1 or 0
        elseif (currentVal == ARGV[1]) then
            return redis.call('pexpire', KEYS[1], ARGV[2])
        else
            return -1
        end
    ",
    keys: new RedisKey[] { lockKey },
    values: new RedisValue[] { resourceId, (long)expiresIn.TotalMilliseconds },
    CommandFlags.DemandMaster);
}

使用RedLock.net中的分布式鎖

以上的版本5,已經包含了分布式鎖的基本思想了,不過我寫的肯定比較簡陋,所以我給大家推薦一個比較不錯的開源實現——RedLock.net

Redis官方文檔整理了常用語言的分布式鎖實現,也梳理了RedLock的實現原理。

/// <summary>
/// 通過使用RedLock在應用集群中扣減庫存
/// </summary>
/// <returns></returns>
[HttpPost("DecreaseProductStockInAppClusterWithRedLock")]
public async Task<string> DecreaseProductStockInAppClusterWithRedLock()
{
    // 鎖的過期時間為30s,等待獲取鎖的時間為20s,如果沒有獲取到鎖,則等待1秒鍾后再次嘗試獲取
    using var redLock = await _distributedLockFactory.CreateLockAsync(
        resource: ProductId.ToString(),
        expiryTime: TimeSpan.FromSeconds(30),
        waitTime: TimeSpan.FromSeconds(20),
        retryTime: TimeSpan.FromSeconds(1)
    );

    // 確認是否已獲取到鎖
    if (redLock.IsAcquired)
    {
        var stockKey = GetProductStockKey(ProductId);
        var currentQuantity = (long)await _redisDatabase.Database.StringGetAsync(stockKey);
        if (currentQuantity < 1)
            throw new Exception("庫存不足");

        var leftQuantity = currentQuantity - 1;
        await _redisDatabase.Database.StringSetAsync(stockKey, leftQuantity);

        return $"剩余庫存:{leftQuantity}";
    }
    else
        throw new Exception("獲取鎖失敗");
}

站在Redis角度上

我們上面站在程序的角度上已經實現了分布式鎖,但是站在Redis角度上,還有幾個問題需要思考一下:

Redis宕機導致無法加鎖

如果Redis宕機了,就會導致Redis服務器不可用,從而導致無法進行加鎖。

解決方法很簡單,可以通過配置主從關系,提高Redis的高可用性,但這樣又產生了下面的問題。

Redis主從切換導致鎖失效

過程是這樣的:

  1. 客戶端 A 從 Redis master 上獲取到了鎖
  2. 在代表鎖的 Key 同步到 Redis slave 之前,master 宕機了
  3. 然后 Redis 進行主從切換, Redis slave 升級為 Redis master
  4. 客戶端 B 從新 Redis master 中獲取到了上面客戶端 A 持有的鎖。

這顯然出大問題了!因此,RedLock算法誕生了。

RedLock

我們不討論時鍾漂移,所以我們假設,多台服務器之間的時鍾漂移很小,以至於我們可以忽略它。

基本原理

首先,我們需要至少5台(大於等於5的奇數個)Redis服務器,這5台Redis之間相互獨立,沒有任何主從、集群關系。

接着,我們按照從左到右的順序,在Redis服務器上獲取鎖,我們假設

  • 鎖的過期時間為10s,
  • 加鎖的開始時間是00:00:00,
  • 在第一台服務器上獲取到鎖的時間為00:00:01,
  • 在第二台服務器上獲取到鎖的時間為00:00:02,
  • 在第三台服務器上獲取到鎖的時間為00:00:03。

現在,已經有超過半數(3/5)的Redis服務器獲取到了鎖

  • 獲取鎖所用的時間 = 最后一台獲取到鎖的Redis服務器獲取到鎖的時間 - 加鎖的開始時間
  • 鎖的有效剩余時間(TTL) = 鎖的過期時間 - 獲取鎖所用的時間

獲取鎖所用的時間 = 00:00:03 - 00:00:00 = 3s,TTL = 10s - (00:00:03 - 00:00:00) = 7s。所以,獲取鎖的時間並沒有超過鎖的有效期,我們認為獲取鎖成功。

認為鎖獲取成功的條件有兩個:

  1. 超過半數的Redis服務器獲取到了鎖
  2. 獲取鎖的時間沒有超過鎖的有效期

重試

以上列舉的示例是非常順利獲取到鎖的情況,然而很多時候,分布式鎖的獲取沒那么順利,很可能出現以下情況:

  • A已經獲取到了兩台Redis服務器的鎖
  • B已經獲取到了兩台Redis服務器的鎖
  • C已經獲取到了一台Redis服務器的鎖

如果三台客戶端的請求一直處於阻塞狀態(直到達到鎖的有效期),會嚴重影響鎖的獲取效率,這時就需要重試機制

重試機制:在一開始,同時向所有的(這里是5台)Redis服務器,發送SET key value EX senconds NX命令,當所有服務器都返回結果后,判斷是否以達成“鎖獲取成功的兩個條件”,如果達成了,則鎖獲取成功。如果沒有,則立即將已獲取的鎖釋放掉,並等待一小段時間,重復以上步驟(一般會嘗試3次)。如果這期間仍未達成“鎖獲取成功的兩個條件”,則認為鎖獲取失敗。

主從切換導致鎖失效

實際上,在RedLock算法中,如果Redis服務配置了主從關系,仍然會出現我們之前提出的問題——主從切換導致鎖失效。

為了解決這個問題,我們需要延遲Redis slave節點提升為Redis master節點的時間,延遲的時間就是鎖的有效剩余時間(TTL),這樣,就不會出現鎖失效的問題了(這似乎只存在於理論層面,如果你知道如何延遲slave提升master的時間,請一定要分享給我)。

釋放鎖

釋放鎖就很簡單了,給每台服務器都發送一個刪除鎖的命令就可以了,因為咱們的腳本已經保證了,只會刪除與當前業務有關聯的鎖。

結語

梳理了那么多,終於來到了結尾,你也發現了,基於Redis實現一個分布式鎖,並沒有想象的那么簡單,細節問題真的很多很多。另外,至少在我看來,RedLock算法實在是有些重量級了,如果不是那么在乎Redis主從切換導致的鎖不一致的問題,單Redis其實就已經足夠了。

另外,RedLock.net中使用了一個變量extendUnlockSemaphore,而不是使用的lock,具體原因可以參考:Reentrant Async Locks in C#

最后,還可以基於Zookeeper來實現分布式鎖,有興趣的可以去了解一下。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM