Redisson分布式鎖之讀鎖RedissonReadLock原理


1、基本配置

配置信息與 Redisson分布式鎖之非公平鎖原理 配置一樣,可自行查看

2、使用&讀寫鎖介紹

 // 獲取key為"rwLock"的鎖對象,此時獲取到的對象是 RReadWriteLock
 RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock");

 RLock lock = rwLock.readLock();  // 獲取讀鎖read
 // or
 RLock lock = rwLock.writeLock(); // 獲取寫鎖write
 // 2、加鎖
 lock.lock();
 try {
   // 進行具體的業務操作
   ...
 } finally {
   // 3、釋放鎖
   lock.unlock();
 }

讀寫鎖的特性:

  • 讀讀兼容、讀寫互斥、寫寫互斥、寫讀互斥

  • 鎖可以降級(當線程先獲取到寫鎖,然后再去獲取讀鎖,接着再釋放寫鎖),但不能升級(先獲取讀鎖,然后再獲取寫鎖,再釋放讀鎖)

為什么可以降級鎖,而不能升級鎖:

  • 因為鎖降級是從寫鎖降級為讀鎖,此時,同一時間拿到寫鎖的只有一個線程,可以直接降級為讀鎖,不會造成沖突;而升級鎖是從讀鎖升級為寫鎖,此時,同一時間拿到讀鎖的可能會有多個線程(讀讀不互斥),會造成沖突

同RedissonFairLock一樣,RReadWriteLock也是RedissonLock的子類 ,主要也是基於 RedissonLock 做的擴展,主要擴展在於加鎖和釋放鎖的地方,以及讀鎖的 wathcdog lua 腳本(經過重寫的),其他的邏輯都直接復用 RedissonLock

3、RedissonReadLock 

3.1 lua腳本加鎖

RedissonReadLock#tryLockInnerAsync

<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, // 獲取鎖模式 hget rwLock mode
              "local mode = redis.call('hget', KEYS[1], 'mode'); " +
              // 鎖的模式為空,即當前鎖尚未有線程獲取
              "if (mode == false) then " +
                // 利用 hset 命令設置鎖模式為讀鎖 hset rwLock mode read
                "redis.call('hset', KEYS[1], 'mode', 'read'); " +
                // 利用 hset 命令為當前線程添加加鎖次數記錄 hset rwLock UUID:threadId 1
                "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                // 利用 set 命令為當前獲取到鎖的線程添加一條超時記錄 String類型 // set {rwLock}:UUID:threadId:rwlock_timeout:1 1
                "redis.call('set', KEYS[2] .. ':1', 1); " +
                // 利用 pexpire 命令為鎖&當前線程超時記錄 添加過期時間 // pexpire {rwLock}:UUID:threadId:rwlock_timeout:1 30000
                "redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
                // pexpire rwLock 30000
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return nil; " +
              "end; " +
              // 鎖模式為讀鎖 或 鎖模式為寫鎖並且獲取寫鎖的為當前線程 hexists rwLock UUID:threadId:write
              "if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
                // 利用 hincrby 命令為當前線程增加加鎖次數,並返回當前值 hincrby rwLock UUID:threadId 1
                "local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                // 為當前線程拼接加鎖記錄 key // key = {rwLock}:UUID:threadId:rwlock_timeout:ind(加鎖的次數)
                "local key = KEYS[2] .. ':' .. ind;" +
                // 利用 set 命令為 key 添加一條加鎖超時記錄,並設置過期時間內
                "redis.call('set', key, 1); " +
                "redis.call('pexpire', key, ARGV[1]); " +
                // 獲取鎖過期的時間
                "local remainTime = redis.call('pttl', KEYS[1]); " +
                // ttl 和 30000 中選出最大值,設置為鎖的過期時間
                "redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
                "return nil; " +
              "end;" +
              // 返回鎖的過期時間
              "return redis.call('pttl', KEYS[1]);", Arrays.asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)), unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId)); }

參數說明:

 KEYS = Collections.singletonList(getRawName(), getReadWriteTimeoutNamePrefix(threadId))

  • KEYS[1]:getRawName(),就是key的名稱,也就是獲取鎖對象時設置的"rwLock"

  • KEYS[2]:getReadWriteTimeoutNamePrefix(threadId)),鎖超時key,即{rwLock}:UUID:threadId:rwlock_timeout

 ARGV = unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId)

  • ARGV[1]:unit.toMillis(leaseTime),鎖過期的時間,默認30s

  • ARGV[2]:getLockName(threadId),UUID:ThreadId,當前線程,UUID來唯一標識一個客戶端

  • ARGV[3]:getWriteLockName(threadId),寫鎖名稱,UUID:threadId:write

首次加讀鎖

也就是執行lua腳本的第一個分支,Redis中的數據有一個key為rwLock結構的Hash鎖,包含鎖的模式,以及加鎖的線程

 

  一個以當前加鎖線程的超時時間(String類型)

讀鎖重入

執行第二個分支

  • 鎖模式為讀鎖,當前線程可獲取讀鎖。即:redisson提供的讀寫鎖支持不同線程重復獲取鎖
  • 鎖模式為寫鎖,並且獲取寫鎖的線程為當前線程,當前線程可獲取讀鎖;即:redisson 提供的讀寫鎖,讀寫並不是完全互斥,而是支持同一線程先獲取寫鎖再獲取讀鎖,也就是 鎖的降級

關於寫鎖判斷,到分析獲取寫鎖的lua腳本時再回頭看;但是可以從這里提前知道,如果為寫鎖添加加鎖次數記錄,使用的 key 是 UUID:threadId:write,而讀鎖使用的 key 是 UUID:threadId

此時Redis中Hash結構的數據中,當前線程的的值加1,表示重入次數

並且在Redis中會再增加一條String類型的數據,表示第二次加鎖的超時時間,可以看到,當一個線程重入n次時,就會有n條對應的超時記錄,並且key最后的數字是依次遞增的

讀讀支持

也是執行第二個分支,此時Hash結構的數據中,存儲鎖的模式,獲取到鎖的線程

以及String類型的線程的超時時間

寫讀互斥

已經加了讀鎖,此時寫鎖進來,不滿足第一部分,也不滿足第二部分,直接返回當前鎖的過期時間,並訂閱消息通道 redisson_rwlock:{rwLock},然后就會在while(true)中進行自旋等待鎖的釋放

至此,整個加鎖的流程完成,從上面可以看出,在讀鎖的時候:

  1. 鎖 rwLock 是哈希表結構的

  2. 加鎖時,會對哈希表設置 mode 字段來表示這個鎖是讀鎖還是寫鎖,mode = read 表示讀鎖

  3. 加鎖時,會對哈希表設置當前線程 rwLock 的 UUID:ThreadId 字段,值表示重入次數

  4. 每次加鎖,會維護一個 key 表示這次鎖的超時時間,這個 key 的結構是 {鎖名字}:UUID:ThreadId:rwlock_timeout:重入次數

3.2 watchdog續期lua腳本 

RedissonReadLock#renewExpirationAsync

protected RFuture<Boolean> renewExpirationAsync(long threadId) { // {rwLock}:UUID:threadId:rwlock_timeout
    String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); // timeoutPrefix.split(":" + getLockName(threadId))[0] -> {rwLock}
    String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 利用 hget 命令獲取當前當前線程的加鎖次數 hget rwLock UUID:threadId
          "local counter = redis.call('hget', KEYS[1], ARGV[2]); " +
          "if (counter ~= false) then " +
              // 當前線程獲取鎖次數大於0,刷新鎖過期時間 pexpire rwLock 30000
              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
              // 利用 hlen 命令獲取鎖集合里面的元素個數,然后判斷是否大於1個以上key hlen rwLock
              "if (redis.call('hlen', KEYS[1]) > 1) then " +
                 // 如果鎖集合里面key大於1個,獲取鎖集合中的所有key hkeys rwLock
                 "local keys = redis.call('hkeys', KEYS[1]); " +
                 // 遍歷每一個key
                 "for n, key in ipairs(keys) do " +
                    // hegt rwLock key 獲取其具體值
                    "counter = tonumber(redis.call('hget', KEYS[1], key)); " + 
                    // 如果值為數字類型,證明此key是加鎖成功的線程,其值表示線程加鎖的次數
                    "if type(counter) == 'number' then " + 
                        // 遍歷加鎖次數,刷新加鎖線程對應的過期時間
                        "for i=counter, 1, -1 do " +
                            // pexpire {rwLock}:key:rwlock_timeout:i 30000
                            "redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " + 
                        "end; " + 
                    "end; " + 
                "end; " +
            "end; " +
            "return 1; " +
        "end; " +
        "return 0;", Arrays.<Object>asList(getRawName(), keyPrefix), internalLockLeaseTime, getLockName(threadId)); }

參數說明 

KEYS = Arrays.asList(getRawName(), keyPrefix)

  • KEYS[1]:getRawName(),就是key的名稱,也就是獲取鎖對象時設置的"rwLock"

  • KEYS[2]:keyPrefix,{rwLock}

ARGV = internalLockLeaseTime, getLockName(threadId)

  • ARGV[1]:internalLockLeaseTime,鎖過期時間,其實就是watchdog超時時間,默認 30*1000 ms

  • ARGV[2]:getLockName(threadId),UUID:ThreadId,UUID來唯一標識一個客戶端

在上述續期的lua腳本中有一個 hlen KEYS[1](hlen rwLock) 的判斷,做這個判斷是因為 讀寫鎖 集合中,包含2個以上的鍵值對,其中一個就是鎖模式,也就是mode字段,來表示當前鎖是讀鎖還是寫鎖;后面的操作獲取鎖集合中所有的key:hkeys KEYS[1](hkeys rwLock),遍歷所有的key,並獲取其值:hget KEYS[1]key(hget rwLock key),如果key的值為數字,證明此key是加鎖成功的線程,並且value的值表示線程加鎖次數;遍歷加鎖次數利用 pexpire 為這個線程對應的加鎖記錄刷新過期時間

之所以遍歷加鎖次數,是因為在鎖重入的時候,每成功加鎖一次,redisson 都會為當前線程新增一條加鎖記錄,並且設置過期時間。

3.3 lua腳本釋放鎖 

RedissonReadLock#unlockInnerAsync

protected RFuture<Boolean> unlockInnerAsync(long threadId) { // {myLock}:UUID:threadId:rwlock_timeout
    String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); // timeoutPrefix.split(":" + getLockName(threadId))[0] -> {myLock}
    String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 利用 hget 命令獲取讀寫鎖的模式
        "local mode = redis.call('hget', KEYS[1], 'mode'); " +
         // 如果鎖模式為空,往讀寫鎖對應的channel發送釋放鎖的消息
        "if (mode == false) then " +
            // publish redisson_rwlock:{rwLock} 0
            "redis.call('publish', KEYS[2], ARGV[1]); " +
            "return 1; " +
        "end; " +
        // 利用 hexists 命令判斷當前線程是否持有鎖 hexists rwLock UUID:threadId
        "local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
        "if (lockExists == 0) then " +
            "return nil;" +
        "end; " +
        // 利用 hincrby 命令,給當前線程持有鎖數量減1 hincrby rwLock UUID:threadId -1
        "local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
        // 如果持有鎖數量減1后等於0,證明當前線程不再持有鎖,那么利用 hdel 命令將鎖map中加鎖次數記錄刪掉
        "if (counter == 0) then " +
            // hdel rwLock UUID:threadId
            "redis.call('hdel', KEYS[1], ARGV[2]); " + 
        "end;" +
        // 刪除線程持有鎖對應的加鎖超時記錄 del {rwLock}:UUID:threadId:rwlock_timeout:count+1
        "redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +

        // 利用 hlen 獲取鎖map中key的數量 hlen rwLock
        "if (redis.call('hlen', KEYS[1]) > 1) then " +
            "local maxRemainTime = -3; " +
            // 獲取鎖map中 key 的數量 hkeys rwLock
            "local keys = redis.call('hkeys', KEYS[1]); " + 
            "for n, key in ipairs(keys) do " +
                // 獲取鎖map中key的值 hget rwLock key(遍歷的key)
                "counter = tonumber(redis.call('hget', KEYS[1], key)); " +
                // 如果值為數字
                "if type(counter) == 'number' then " + 
                    // 遍歷加鎖次數,刷新加鎖線程對應的過期時間
                    "for i=counter, 1, -1 do " +
                       // 利用 pttl 獲取超時時間 pptl {rwLock}:key:rwlock_timeout:i
                       "local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " + 
                       // 與 maxRemainTime 對比,將最大值,賦值給 maxRemainTime
                       "maxRemainTime = math.max(remainTime, maxRemainTime);" + 
                    "end; " + 
                "end; " + 
            "end; " +
            // 判斷 maxRemainTime 是否大於0,如果大於0,給鎖重新設置過期時間為 maxRemainTime,然后返回0結束lua腳本的執行 
            "if maxRemainTime > 0 then " +
                // pexpire rwLock maxRemainTime
                "redis.call('pexpire', KEYS[1], maxRemainTime); " +
                "return 0; " +
            "end;" + 
            // 如果當前讀寫鎖的鎖模式是寫鎖,直接返回0結束lua腳本的執行
            "if mode == 'write' then " + 
                "return 0;" + 
            "end; " +
        "end; " +
        // 當走到最后的操作,證明當前線程不但成功釋放鎖,並且釋放后當前讀寫鎖已經沒有其他線程再持有鎖了 // 直接將讀寫鎖對應的key直接刪掉,並且往讀寫鎖對應的channel中發布釋放鎖消息 // del rwLock
        "redis.call('del', KEYS[1]); " +
        // publish redisson_rwlock:{rwLock} 0
        "redis.call('publish', KEYS[2], ARGV[1]); " +
        "return 1; ", Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix), LockPubSub.UNLOCK_MESSAGE, getLockName(threadId)); }

 參數說明

 KEYS = Arrays.asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix)

  • KEYS[1]:getRawName(),就是key的名稱,也就是獲取鎖對象時設置的"rwLock"

  • KEYS[2]:getChannelName(),訂閱消息的通道,redisson_rwlock:{rwLock}

  • KEYS[3]:timeoutPrefix,{rwLock}:UUID:threadId:rwlock_timeout

  • KEYS[4]:keyPrefix,{rwLock}

ARGV = LockPubSub.UNLOCK_MESSAGE, getLockName(threadId)

  • ARGV[1]:LockPubSub.UNLOCK_MESSAGE,Redis發布事件時的message,為 0

  • ARGV[2]:getLockName(threadId),UUID:ThreadId,UUID來唯一標識一個客戶端

到這里,整個讀鎖的流程全部結束,但是有兩個小小的疑問?

為什么給讀鎖扣減不需要先判斷鎖的模式?

  • 在鎖map中記錄加鎖次數時,讀鎖的key是UUID:threadId,而寫鎖的key是UUID:threadId:write,那么就是說讀鎖的key和寫鎖的key是不一樣的。所以解鎖的時候,直接使用對應key來扣減持有鎖次數即可。

  • 相同線程,如果獲取了寫鎖后,還是可以繼續獲取讀鎖的。所以只需要判斷鎖map有讀鎖加鎖次數記錄即可,就可以判斷當前線程是持有讀鎖的,並不需要關心當前鎖的模式。

為什么鎖map中的key都大於1了,證明肯定還有線程持有鎖,那為什么還會存在 maxRemainTime 最后小於0的情況呢?

  • 有一個點我們還沒學到,那就是其實讀寫鎖中,如果是獲取寫鎖,並不會新增一條寫鎖的超時記錄,因為讀寫鎖中,寫鎖和寫鎖是互斥的,寫鎖和讀鎖也是互斥的,即使支持當前線程先獲取寫鎖再獲取讀鎖,其實也不需要增加一條寫鎖的超時時間,因為讀寫鎖 key 的超時時間就等於寫鎖的超時時間。

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM