1、基本配置
配置信息與 Redisson分布式鎖之非公平鎖原理 配置一樣,可自行查看
2、使用&讀寫鎖介紹
// 獲取key為"rwLock"的鎖對象,此時獲取到的對象是 RReadWriteLock RReadWriteLock rwLock = redissonClient.getReadWriteLock("rwLock"); RLock lock = rwLock.readLock(); // 獲取讀鎖read // or RLock lock = rwLock.writeLock(); // 獲取寫鎖write // 2、加鎖 lock.lock(); try { // 進行具體的業務操作 ... } finally { // 3、釋放鎖 lock.unlock(); }
讀寫鎖的特性:
-
讀讀兼容、讀寫互斥、寫寫互斥、寫讀互斥
-
鎖可以降級(當線程先獲取到寫鎖,然后再去獲取讀鎖,接着再釋放寫鎖),但不能升級(先獲取讀鎖,然后再獲取寫鎖,再釋放讀鎖)
為什么可以降級鎖,而不能升級鎖:
- 因為鎖降級是從寫鎖降級為讀鎖,此時,同一時間拿到寫鎖的只有一個線程,可以直接降級為讀鎖,不會造成沖突;而升級鎖是從讀鎖升級為寫鎖,此時,同一時間拿到讀鎖的可能會有多個線程(讀讀不互斥),會造成沖突
同RedissonFairLock一樣,RReadWriteLock也是RedissonLock的子類 ,主要也是基於 RedissonLock 做的擴展,主要擴展在於加鎖和釋放鎖的地方,以及讀鎖的 wathcdog lua 腳本(經過重寫的),其他的邏輯都直接復用 RedissonLock
3、RedissonReadLock
3.1 lua腳本加鎖
RedissonReadLock#tryLockInnerAsync
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command, // 獲取鎖模式 hget rwLock mode
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
// 鎖的模式為空,即當前鎖尚未有線程獲取
"if (mode == false) then " +
// 利用 hset 命令設置鎖模式為讀鎖 hset rwLock mode read
"redis.call('hset', KEYS[1], 'mode', 'read'); " +
// 利用 hset 命令為當前線程添加加鎖次數記錄 hset rwLock UUID:threadId 1
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
// 利用 set 命令為當前獲取到鎖的線程添加一條超時記錄 String類型 // set {rwLock}:UUID:threadId:rwlock_timeout:1 1
"redis.call('set', KEYS[2] .. ':1', 1); " +
// 利用 pexpire 命令為鎖&當前線程超時記錄 添加過期時間 // pexpire {rwLock}:UUID:threadId:rwlock_timeout:1 30000
"redis.call('pexpire', KEYS[2] .. ':1', ARGV[1]); " +
// pexpire rwLock 30000
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 鎖模式為讀鎖 或 鎖模式為寫鎖並且獲取寫鎖的為當前線程 hexists rwLock UUID:threadId:write
"if (mode == 'read') or (mode == 'write' and redis.call('hexists', KEYS[1], ARGV[3]) == 1) then " +
// 利用 hincrby 命令為當前線程增加加鎖次數,並返回當前值 hincrby rwLock UUID:threadId 1
"local ind = redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
// 為當前線程拼接加鎖記錄 key // key = {rwLock}:UUID:threadId:rwlock_timeout:ind(加鎖的次數)
"local key = KEYS[2] .. ':' .. ind;" +
// 利用 set 命令為 key 添加一條加鎖超時記錄,並設置過期時間內
"redis.call('set', key, 1); " +
"redis.call('pexpire', key, ARGV[1]); " +
// 獲取鎖過期的時間
"local remainTime = redis.call('pttl', KEYS[1]); " +
// ttl 和 30000 中選出最大值,設置為鎖的過期時間
"redis.call('pexpire', KEYS[1], math.max(remainTime, ARGV[1])); " +
"return nil; " +
"end;" +
// 返回鎖的過期時間
"return redis.call('pttl', KEYS[1]);", Arrays.asList(getRawName(), getReadWriteTimeoutNamePrefix(threadId)), unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId)); }
參數說明:
KEYS = Collections.singletonList(getRawName(), getReadWriteTimeoutNamePrefix(threadId))
KEYS[1]:getRawName(),就是key的名稱,也就是獲取鎖對象時設置的"rwLock"
KEYS[2]:getReadWriteTimeoutNamePrefix(threadId)),鎖超時key,即{rwLock}:UUID:threadId:rwlock_timeout
ARGV = unit.toMillis(leaseTime), getLockName(threadId), getWriteLockName(threadId)
ARGV[1]:unit.toMillis(leaseTime),鎖過期的時間,默認30s
ARGV[2]:getLockName(threadId),UUID:ThreadId,當前線程,UUID來唯一標識一個客戶端
ARGV[3]:getWriteLockName(threadId),寫鎖名稱,UUID:threadId:write
首次加讀鎖
也就是執行lua腳本的第一個分支,Redis中的數據有一個key為rwLock結構的Hash鎖,包含鎖的模式,以及加鎖的線程
一個以當前加鎖線程的超時時間(String類型)
讀鎖重入
執行第二個分支
- 鎖模式為讀鎖,當前線程可獲取讀鎖。即:redisson提供的讀寫鎖支持不同線程重復獲取鎖
- 鎖模式為寫鎖,並且獲取寫鎖的線程為當前線程,當前線程可獲取讀鎖;即:redisson 提供的讀寫鎖,讀寫並不是完全互斥,而是支持同一線程先獲取寫鎖再獲取讀鎖,也就是 鎖的降級
關於寫鎖判斷,到分析獲取寫鎖的lua腳本時再回頭看;但是可以從這里提前知道,如果為寫鎖添加加鎖次數記錄,使用的 key 是 UUID:threadId:write,而讀鎖使用的 key 是 UUID:threadId
此時Redis中Hash結構的數據中,當前線程的的值加1,表示重入次數
並且在Redis中會再增加一條String類型的數據,表示第二次加鎖的超時時間,可以看到,當一個線程重入n次時,就會有n條對應的超時記錄,並且key最后的數字是依次遞增的
讀讀支持
也是執行第二個分支,此時Hash結構的數據中,存儲鎖的模式,獲取到鎖的線程
以及String類型的線程的超時時間
寫讀互斥
已經加了讀鎖,此時寫鎖進來,不滿足第一部分,也不滿足第二部分,直接返回當前鎖的過期時間,並訂閱消息通道 redisson_rwlock:{rwLock},然后就會在while(true)中進行自旋等待鎖的釋放
至此,整個加鎖的流程完成,從上面可以看出,在讀鎖的時候:
-
鎖 rwLock 是哈希表結構的
-
加鎖時,會對哈希表設置 mode 字段來表示這個鎖是讀鎖還是寫鎖,mode = read 表示讀鎖
-
加鎖時,會對哈希表設置當前線程 rwLock 的 UUID:ThreadId 字段,值表示重入次數
-
每次加鎖,會維護一個 key 表示這次鎖的超時時間,這個 key 的結構是 {鎖名字}:UUID:ThreadId:rwlock_timeout:重入次數
3.2 watchdog續期lua腳本
RedissonReadLock#renewExpirationAsync
protected RFuture<Boolean> renewExpirationAsync(long threadId) { // {rwLock}:UUID:threadId:rwlock_timeout
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); // timeoutPrefix.split(":" + getLockName(threadId))[0] -> {rwLock}
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 利用 hget 命令獲取當前當前線程的加鎖次數 hget rwLock UUID:threadId
"local counter = redis.call('hget', KEYS[1], ARGV[2]); " +
"if (counter ~= false) then " +
// 當前線程獲取鎖次數大於0,刷新鎖過期時間 pexpire rwLock 30000
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
// 利用 hlen 命令獲取鎖集合里面的元素個數,然后判斷是否大於1個以上key hlen rwLock
"if (redis.call('hlen', KEYS[1]) > 1) then " +
// 如果鎖集合里面key大於1個,獲取鎖集合中的所有key hkeys rwLock
"local keys = redis.call('hkeys', KEYS[1]); " +
// 遍歷每一個key
"for n, key in ipairs(keys) do " +
// hegt rwLock key 獲取其具體值
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
// 如果值為數字類型,證明此key是加鎖成功的線程,其值表示線程加鎖的次數
"if type(counter) == 'number' then " +
// 遍歷加鎖次數,刷新加鎖線程對應的過期時間
"for i=counter, 1, -1 do " +
// pexpire {rwLock}:key:rwlock_timeout:i 30000
"redis.call('pexpire', KEYS[2] .. ':' .. key .. ':rwlock_timeout:' .. i, ARGV[1]); " +
"end; " +
"end; " +
"end; " +
"end; " +
"return 1; " +
"end; " +
"return 0;", Arrays.<Object>asList(getRawName(), keyPrefix), internalLockLeaseTime, getLockName(threadId)); }
參數說明
KEYS = Arrays.asList(getRawName(), keyPrefix)
KEYS[1]:getRawName(),就是key的名稱,也就是獲取鎖對象時設置的"rwLock"
KEYS[2]:keyPrefix,{rwLock}
ARGV = internalLockLeaseTime, getLockName(threadId)
ARGV[1]:internalLockLeaseTime,鎖過期時間,其實就是watchdog超時時間,默認 30*1000 ms
ARGV[2]:getLockName(threadId),UUID:ThreadId,UUID來唯一標識一個客戶端
在上述續期的lua腳本中有一個 hlen KEYS[1](hlen rwLock) 的判斷,做這個判斷是因為 讀寫鎖 集合中,包含2個以上的鍵值對,其中一個就是鎖模式,也就是mode字段,來表示當前鎖是讀鎖還是寫鎖;后面的操作獲取鎖集合中所有的key:hkeys KEYS[1](hkeys rwLock),遍歷所有的key,並獲取其值:hget KEYS[1]key(hget rwLock key),如果key的值為數字,證明此key是加鎖成功的線程,並且value的值表示線程加鎖次數;遍歷加鎖次數利用 pexpire 為這個線程對應的加鎖記錄刷新過期時間
之所以遍歷加鎖次數,是因為在鎖重入的時候,每成功加鎖一次,redisson 都會為當前線程新增一條加鎖記錄,並且設置過期時間。
3.3 lua腳本釋放鎖
RedissonReadLock#unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) { // {myLock}:UUID:threadId:rwlock_timeout
String timeoutPrefix = getReadWriteTimeoutNamePrefix(threadId); // timeoutPrefix.split(":" + getLockName(threadId))[0] -> {myLock}
String keyPrefix = getKeyPrefix(threadId, timeoutPrefix); return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, // 利用 hget 命令獲取讀寫鎖的模式
"local mode = redis.call('hget', KEYS[1], 'mode'); " +
// 如果鎖模式為空,往讀寫鎖對應的channel發送釋放鎖的消息
"if (mode == false) then " +
// publish redisson_rwlock:{rwLock} 0
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end; " +
// 利用 hexists 命令判斷當前線程是否持有鎖 hexists rwLock UUID:threadId
"local lockExists = redis.call('hexists', KEYS[1], ARGV[2]); " +
"if (lockExists == 0) then " +
"return nil;" +
"end; " +
// 利用 hincrby 命令,給當前線程持有鎖數量減1 hincrby rwLock UUID:threadId -1
"local counter = redis.call('hincrby', KEYS[1], ARGV[2], -1); " +
// 如果持有鎖數量減1后等於0,證明當前線程不再持有鎖,那么利用 hdel 命令將鎖map中加鎖次數記錄刪掉
"if (counter == 0) then " +
// hdel rwLock UUID:threadId
"redis.call('hdel', KEYS[1], ARGV[2]); " +
"end;" +
// 刪除線程持有鎖對應的加鎖超時記錄 del {rwLock}:UUID:threadId:rwlock_timeout:count+1
"redis.call('del', KEYS[3] .. ':' .. (counter+1)); " +
// 利用 hlen 獲取鎖map中key的數量 hlen rwLock
"if (redis.call('hlen', KEYS[1]) > 1) then " +
"local maxRemainTime = -3; " +
// 獲取鎖map中 key 的數量 hkeys rwLock
"local keys = redis.call('hkeys', KEYS[1]); " +
"for n, key in ipairs(keys) do " +
// 獲取鎖map中key的值 hget rwLock key(遍歷的key)
"counter = tonumber(redis.call('hget', KEYS[1], key)); " +
// 如果值為數字
"if type(counter) == 'number' then " +
// 遍歷加鎖次數,刷新加鎖線程對應的過期時間
"for i=counter, 1, -1 do " +
// 利用 pttl 獲取超時時間 pptl {rwLock}:key:rwlock_timeout:i
"local remainTime = redis.call('pttl', KEYS[4] .. ':' .. key .. ':rwlock_timeout:' .. i); " +
// 與 maxRemainTime 對比,將最大值,賦值給 maxRemainTime
"maxRemainTime = math.max(remainTime, maxRemainTime);" +
"end; " +
"end; " +
"end; " +
// 判斷 maxRemainTime 是否大於0,如果大於0,給鎖重新設置過期時間為 maxRemainTime,然后返回0結束lua腳本的執行
"if maxRemainTime > 0 then " +
// pexpire rwLock maxRemainTime
"redis.call('pexpire', KEYS[1], maxRemainTime); " +
"return 0; " +
"end;" +
// 如果當前讀寫鎖的鎖模式是寫鎖,直接返回0結束lua腳本的執行
"if mode == 'write' then " +
"return 0;" +
"end; " +
"end; " +
// 當走到最后的操作,證明當前線程不但成功釋放鎖,並且釋放后當前讀寫鎖已經沒有其他線程再持有鎖了 // 直接將讀寫鎖對應的key直接刪掉,並且往讀寫鎖對應的channel中發布釋放鎖消息 // del rwLock
"redis.call('del', KEYS[1]); " +
// publish redisson_rwlock:{rwLock} 0
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; ", Arrays.<Object>asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix), LockPubSub.UNLOCK_MESSAGE, getLockName(threadId)); }
參數說明
KEYS = Arrays.asList(getRawName(), getChannelName(), timeoutPrefix, keyPrefix)
KEYS[1]:getRawName(),就是key的名稱,也就是獲取鎖對象時設置的"rwLock"
KEYS[2]:getChannelName(),訂閱消息的通道,redisson_rwlock:{rwLock}
KEYS[3]:timeoutPrefix,{rwLock}:UUID:threadId:rwlock_timeout
KEYS[4]:keyPrefix,{rwLock}
ARGV = LockPubSub.UNLOCK_MESSAGE, getLockName(threadId)
ARGV[1]:LockPubSub.UNLOCK_MESSAGE,Redis發布事件時的message,為 0
ARGV[2]:getLockName(threadId),UUID:ThreadId,UUID來唯一標識一個客戶端
到這里,整個讀鎖的流程全部結束,但是有兩個小小的疑問?
為什么給讀鎖扣減不需要先判斷鎖的模式?
在鎖map中記錄加鎖次數時,讀鎖的key是UUID:threadId,而寫鎖的key是UUID:threadId:write,那么就是說讀鎖的key和寫鎖的key是不一樣的。所以解鎖的時候,直接使用對應key來扣減持有鎖次數即可。
相同線程,如果獲取了寫鎖后,還是可以繼續獲取讀鎖的。所以只需要判斷鎖map有讀鎖加鎖次數記錄即可,就可以判斷當前線程是持有讀鎖的,並不需要關心當前鎖的模式。
為什么鎖map中的key都大於1了,證明肯定還有線程持有鎖,那為什么還會存在 maxRemainTime 最后小於0的情況呢?
有一個點我們還沒學到,那就是其實讀寫鎖中,如果是獲取寫鎖,並不會新增一條寫鎖的超時記錄,因為讀寫鎖中,寫鎖和寫鎖是互斥的,寫鎖和讀鎖也是互斥的,即使支持當前線程先獲取寫鎖再獲取讀鎖,其實也不需要增加一條寫鎖的超時時間,因為讀寫鎖 key 的超時時間就等於寫鎖的超時時間。