Redisson 實現分布式鎖的原理分析


寫在前面

在了解分布式鎖具體實現方案之前,我們應該先思考一下使用分布式鎖必須要考慮的一些問題。​

  • 互斥性:在任意時刻,只能有一個進程持有鎖。

  • 防死鎖:即使有一個進程在持有鎖的期間崩潰而未能主動釋放鎖,要有其他方式去釋放鎖從而保證其他進程能獲取到鎖。

  • 加鎖和解鎖的必須是同一個進程。

  • 鎖的續期問題。

常見的分布式鎖實現方案

  • 基於 Redis 實現分布式鎖

  • 基於 Zookeeper 實現分布式鎖

本文采用第一種方案,也就是基於 Redis 的分布式鎖實現方案。

Redis 實現分布式鎖主要步驟

  1. 指定一個 key 作為鎖標記,存入 Redis 中,指定一個 唯一的用戶標識 作為 value。
  2. 當 key 不存在時才能設置值,確保同一時間只有一個客戶端進程獲得鎖,滿足 互斥性 特性。
  3. 設置一個過期時間,防止因系統異常導致沒能刪除這個 key,滿足 防死鎖 特性。
  4. 當處理完業務之后需要清除這個 key 來釋放鎖,清除 key 時需要校驗 value 值,需要滿足 只有加鎖的人才能釋放鎖

特別注意:以上實現步驟考慮到了使用分布式鎖需要考慮的互斥性、防死鎖、加鎖和解鎖必須為同一個進程等問題,但是鎖的續期無法實現。所以,博主采用 Redisson 實現 Redis 的分布式鎖,借助 Redisson 的 WatchDog 機制 能夠很好的解決鎖續期的問題,同樣 Redisson 也是 Redis 官方推薦分布式鎖實現方案,實現起來較為簡單。

Redisson 實現分布式鎖

具體實現代碼已經上傳到博主的倉庫,需要的朋友可以在公眾號內回復 【分布式鎖代碼】 獲取碼雲或 GitHub 項目下載地址。

下面從加鎖機制、鎖互斥機制、Watch dog 機制、可重入加鎖機制、鎖釋放機制、等五個方面對 Redisson 實現分布式鎖的底層原理進行分析。

加鎖原理

加鎖其實是通過一段 lua 腳本實現的,如下:

我們把這一段 lua 腳本抽出來看:

if (redis.call('exists', KEYS[1]) == 0) then " +
   "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
   "redis.call('pexpire', KEYS[1], ARGV[1]); " +
   "return nil; " +
   "end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "return nil; " +
    "end; " +
"return redis.call('pttl', KEYS[1]);"

這里 KEYS[1] 代表的是你加鎖的 key,比如你自己設置了加鎖的那個鎖 key 就是 “myLock”。

// create a lock
RLock lock = redisson.getLock("myLock");

這里 ARGV[1] 代表的是鎖 key 的默認生存時間,默認 30 秒。ARGV[2] 代表的是加鎖的客戶端的 ID,類似於下面這樣:285475da-9152-4c83-822a-67ee2f116a79:52。至於最后面的一個 1 是為了后面可重入做的計數統計,后面會有講解到。

我們來看一下在 Redis 中的存儲結構:

127.0.0.1:6379> HGETALL myLock
1) "285475da-9152-4c83-822a-67ee2f116a79:52"
2) "1"

上面這一段加鎖的 lua 腳本的作用是:第一段 if 判斷語句,就是用 exists myLock 命令判斷一下,如果你要加鎖的那個鎖 key 不存在的話,你就進行加鎖。如何加鎖呢?使用 hincrby 命令設置一個 hash 結構,類似於在 Redis 中使用下面的操作:

127.0.0.1:6379> HINCRBY myLock 285475da-9152-4c83-822a-67ee2f116a79:52 1
(integer) 1

接着會執行 pexpire myLock 30000 命令,設置 myLock 這個鎖 key 的生存時間是 30 秒。到此為止,加鎖完成。

有的小伙伴可能此時就有疑問了,如果此時有第二個客戶端請求加鎖呢? 這就是下面要說的鎖互斥機制。

鎖互斥機制

此時,如果客戶端 2 來嘗試加鎖,會如何呢?首先,第一個 if 判斷會執行 exists myLock,發現 myLock 這個鎖 key 已經存在了。接着第二個 if 判斷,判斷一下,myLock 鎖 key 的 hash 數據結構中,是否包含客戶端 2 的 ID,這里明顯不是,因為那里包含的是客戶端 1 的 ID。所以,客戶端 2 會執行:

return redis.call('pttl', KEYS[1]);

返回的一個數字,這個數字代表了 myLock 這個鎖 key 的剩余生存時間。

接下來我們看一下 Redissson tryLock 的主流程:

@Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        // 1.嘗試獲取鎖
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }

        // 申請鎖的耗時如果大於等於最大等待時間,則申請鎖失敗.
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }

        current = System.currentTimeMillis();

        /**
         * 2.訂閱鎖釋放事件,並通過 await 方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題:
         * 基於信息量,當鎖被其它資源占用時,當前線程通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會發消息通知待等待的線程進行競爭.
         *
         * 當 this.await 返回 false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱並返回獲取鎖失敗.
         * 當 this.await 返回 true,進入循環嘗試獲取鎖.
         */
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        // await 方法內部是用 CountDownLatch 來實現阻塞,獲取 subscribe 異步執行的結果(應用了 Netty 的 Future)
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            // 計算獲取鎖的總耗時,如果大於等於最大等待時間,則獲取鎖失敗.
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;

              }

            /**
             * 3.收到鎖釋放的信號后,在最大等待時間之內,循環一次接着一次的嘗試獲取鎖
             * 獲取鎖成功,則立馬返回 true,
             * 若在最大等待時間之內還沒獲取到鎖,則認為獲取鎖失敗,返回 false 結束循環
             */
            while (true) {
                long currentTime = System.currentTimeMillis();

                // 再次嘗試獲取鎖
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
                // 超過最大等待時間則返回 false 結束循環,獲取鎖失敗
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                /**
                 * 6.阻塞等待鎖(通過信號量(共享鎖)阻塞,等待解鎖消息):
                 */
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    //如果剩余時間(ttl)小於wait time ,就在 ttl 時間內,從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    //則就在wait time 時間范圍內等待可以通過信號量
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                // 更新剩余的等待時間(最大等待時間-已經消耗的阻塞時間)
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            // 7.無論是否獲得鎖,都要取消訂閱解鎖消息
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

流程分析:

  1. 嘗試獲取鎖,返回 null 則說明加鎖成功,返回一個數值,則說明已經存在該鎖,ttl 為鎖的剩余存活時間。

  2. 如果此時客戶端 2 進程獲取鎖失敗,那么使用客戶端 2 的線程 id(其實本質上就是進程 id)通過 Redis 的 channel 訂閱鎖釋放的事件。如果等待的過程中一直未等到鎖的釋放事件通知,當超過最大等待時間則獲取鎖失敗,返回 false,也就是第 39 行代碼。如果等到了鎖的釋放事件的通知,則開始進入一個不斷重試獲取鎖的循環。

  3. 循環中每次都先試着獲取鎖,並得到已存在的鎖的剩余存活時間。如果在重試中拿到了鎖,則直接返回。如果鎖當前還是被占用的,那么等待釋放鎖的消息,具體實現使用了 JDK 的信號量 Semaphore 來阻塞線程,當鎖釋放並發布釋放鎖的消息后,信號量的 release() 方法會被調用,此時被信號量阻塞的等待隊列中的一個線程就可以繼續嘗試獲取鎖了。

特別注意:以上過程存在一個細節,這里有必要說明一下,也是分布式鎖的一個關鍵點:當鎖正在被占用時,等待獲取鎖的進程並不是通過一個 while(true) 死循環去獲取鎖,而是利用了 Redis 的發布訂閱機制,通過 await 方法阻塞等待鎖的進程,有效的解決了無效的鎖申請浪費資源的問題

鎖的續期機制

客戶端 1 加鎖的鎖 key 默認生存時間才 30 秒,如果超過了 30 秒,客戶端 1 還想一直持有這把鎖,怎么辦呢?

Redisson 提供了一個續期機制, 只要客戶端 1 一旦加鎖成功,就會啟動一個 Watch Dog。

private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        if (e != null) {
            return;
        }

        // lock acquired
        if (ttlRemaining == null) {
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

注意:從以上源碼我們看到 leaseTime 必須是 -1 才會開啟 Watch Dog 機制,也就是如果你想開啟 Watch Dog 機制必須使用默認的加鎖時間為 30s。如果你自己自定義時間,超過這個時間,鎖就會自定釋放,並不會延長。

private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        renewExpiration();
    }
}

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()),
        internalLockLeaseTime, getLockName(threadId));
}

Watch Dog 機制其實就是一個后台定時任務線程,獲取鎖成功之后,會將持有鎖的線程放入到一個 RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 檢查一下,如果客戶端 1 還持有鎖 key(判斷客戶端是否還持有 key,其實就是遍歷 EXPIRATION_RENEWAL_MAP 里面線程 id 然后根據線程 id 去 Redis 中查,如果存在就會延長 key 的時間),那么就會不斷的延長鎖 key 的生存時間。

注意:這里有一個細節問題,如果服務宕機了,Watch Dog 機制線程也就沒有了,此時就不會延長 key 的過期時間,到了 30s 之后就會自動過期了,其他線程就可以獲取到鎖。

可重入加鎖機制

Redisson 也是支持可重入鎖的,比如下面這種代碼:

@Override
public void lock() {
    RLock lock = redissonSingle.getLock("myLock");
    try {
        lock.lock();

        // 執行業務
        doBusiness();

        lock.lock();

    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        // 釋放鎖
        lock.unlock();
        lock.unlock();
        logger.info("任務執行完畢, 釋放鎖!");
    }
}

我們再分析一下加鎖那段 lua 代碼:

if (redis.call('exists', KEYS[1]) == 0) then " +
   "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
   "redis.call('pexpire', KEYS[1], ARGV[1]); " +
   "return nil; " +
   "end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
    "return nil; " +
    "end; " +
"return redis.call('pttl', KEYS[1]);"

第一個 if 判斷肯定不成立,exists myLock 會顯示鎖 key 已經存在。第二個 if 判斷會成立,因為 myLock 的 hash 數據結構中包含的那個 ID 即客戶端 1 的 ID,此時就會執行可重入加鎖的邏輯,使用:hincrby myLock 285475da-9152-4c83-822a-67ee2f116a79:52 1 對客戶端 1 的加鎖次數加 1。此時 myLock 數據結構變為下面這樣:

127.0.0.1:6379> HGETALL myLock
1) "285475da-9152-4c83-822a-67ee2f116a79:52"
2) "2"

到這里,小伙伴本就都明白了 hash 結構的 key 是鎖的名稱,field 是客戶端 ID,value 是該客戶端加鎖的次數。

這里有一個細節,如果加鎖支持可重入鎖,那么解鎖呢?

釋放鎖機制

執行

lock.unlock()

就可以釋放分布式鎖。我們來看一下釋放鎖的流程代碼:

@Override
public RFuture<Void> unlockAsync(long threadId) {
    RPromise<Void> result = new RedissonPromise<Void>();
    // 1. 異步釋放鎖
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    // 取消 Watch Dog 機制
    future.onComplete((opStatus, e) -> {
        cancelExpirationRenewal(threadId);

        if (e != null) {
            result.tryFailure(e);
            return;
        }

        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }

        result.trySuccess(null);
    });

    return result;
}
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            // 判斷鎖 key 是否存在
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            // 將該客戶端對應的鎖的 hash 結構的 value 值遞減為 0 后再進行刪除
            // 然后再向通道名為 redisson_lock__channel publish 一條 UNLOCK_MESSAGE 信息
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

從以上代碼來看,釋放鎖的步驟主要分三步:

  1. 刪除鎖(這里注意可重入鎖,在上面的腳本中有詳細分析)。

  2. 廣播釋放鎖的消息,通知阻塞等待的進程(向通道名為 redisson_lock__channel publish 一條 UNLOCK_MESSAGE 信息)。

  3. 取消 Watch Dog 機制,即將 RedissonLock.EXPIRATION_RENEWAL_MAP 里面的線程 id 刪除,並且 cancel 掉 Netty 的那個定時任務線程。

方案優點

  1. Redisson 通過 Watch Dog 機制很好的解決了鎖的續期問題。

  2. 和 Zookeeper 相比較,Redisson 基於 Redis 性能更高,適合對性能要求高的場景。

  3. 通過 Redisson 實現分布式可重入鎖,比原生的 SET mylock userId NX PX milliseconds + lua 實現的效果更好些,雖然基本原理都一樣,但是它幫我們屏蔽了內部的執行細節。

  4. 在等待申請鎖資源的進程等待申請鎖的實現上也做了一些優化,減少了無效的鎖申請,提升了資源的利用率。

方案缺點

  1. 使用 Redisson 實現分布式鎖方案最大的問題就是如果你對某個 Redis Master 實例完成了加鎖,此時 Master 會異步復制給其對應的 slave 實例。但是這個過程中一旦 Master 宕機,主備切換,slave 變為了 Master。接着就會導致,客戶端 2 來嘗試加鎖的時候,在新的 Master 上完成了加鎖,而客戶端 1 也以為自己成功加了鎖,此時就會導致多個客戶端對一個分布式鎖完成了加鎖,這時系統在業務語義上一定會出現問題,導致各種臟數據的產生。所以這個就是 Redis Cluster 或者說是 Redis Master-Slave 架構的主從異步復制導致的 Redis 分布式鎖的最大缺陷(在 Redis Master 實例宕機的時候,可能導致多個客戶端同時完成加鎖)。

  2. 有個別觀點說使用 Watch Dog 機制開啟一個定時線程去不斷延長鎖的時間對系統有所損耗(這里只是網絡上的一種說法,博主查了很多資料並且結合實際生產並不認為有很大系統損耗,這個僅供大家參考)。

總結

以上就是基於 Redis 使用 Redisson 實現分布式鎖的所有原理分析,希望可以幫助小伙伴們對分布式鎖的理解有所加深。其實分析完源碼后發現基於 Redis 自己手動實現一個簡版的分布式鎖工具也並不是很難,有興趣的小伙伴可以試試。

參考


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM