博客待整理,先只是把源碼看了....
后面不再備注redis中的命令含義了,這樣備注寫太多了不好閱讀.
package org.redisson;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import org.redisson.api.RFuture;
import org.redisson.api.RLock;
import org.redisson.client.codec.LongCodec;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.command.CommandExecutor;
import org.redisson.pubsub.LockPubSub;
/**
* Distributed implementation of {@link java.util.concurrent.locks.Lock}
* Implements reentrant lock.<br>
* Lock will be removed automatically if client disconnects.
* <p>
* Implements a <b>fair</b> locking so it guarantees an acquire order by threads.
*
* @author Nikita Koksharov
*
*/
/** vergilyn mark: <br/>
* remove stale threads: 為什么不寫成公共的? <br/>
* redisson是如何解決服務器之間時間不同步的: 因為{Redisson.UUID}, 不同服務器生成UUID不同, 所以其時間戳的值肯定會來至原有的服務器. <br/>
* "redisson_lock_queue:{lock_name}": 這LIST的順序就是獲取鎖的順序, 即FairLock實現原理. <br/>
* "redisson_lock_timeout:{xxx}": 這其中的score並不表示獲取鎖的順序, 而是表示線程競爭鎖的失效時間點. <br/>
*/
public class RedissonFairLock extends RedissonLock implements RLock {
private final long threadWaitTime = 5000;
private final CommandExecutor commandExecutor;
/** vergilyn mark: <br/>
* 構造函數, 被protected修飾, 說明不能在外部通過new獲取; 一般的構建是:{@link Redisson#getFairLock(String)}. <br/>
* @param name lock的'鎖名', 實際定義在{@link RedissonObject#name}
* @param id
*/
protected RedissonFairLock(CommandExecutor commandExecutor, String name, UUID id) {
super(commandExecutor, name, id);
this.commandExecutor = commandExecutor;
}
/** vergilyn mark: <br/>
* 未被修飾符修飾, 默認即friendly: 在同一個包中的類可以訪問, 其他包中的類不能訪問. <br/>
* type : LIST <br/>
* key : "redisson_lock_queue:{lock_name}" <br/>
* value: "{Redisson.UUID}:{threadId}" <br/>
* @return ex, redisson_lock_queue:{lock_name}
*/
String getThreadsQueueName() {
return prefixName("redisson_lock_queue", getName());
}
/** vergilyn mark: <br/>
* type : SORT-SET <br/>
* key : "redisson_lock_timeout:{lock_name}" <br/>
* value: "{Redisson.UUID}:{threadId}" <br/>
* score: 時間戳 <br/>
* @return ex, redisson_lock_timeout:{lock_name}
*/
String getTimeoutSetName() {
return prefixName("redisson_lock_timeout", getName());
}
@Override
protected RedissonLockEntry getEntry(long threadId) {
return PUBSUB.getEntry(getEntryName() + ":" + threadId);
}
/** vergilyn mark: <br/>
* 訂閱, 重寫: {@link RedissonLock#subscribe(long)}
*/
@Override
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
}
/** vergilyn mark: <br/>
* 取消訂閱, 重寫: {@link RedissonLock#unsubscribe(RFuture, long)}
*/
@Override
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
PUBSUB.unsubscribe(future.getNow(), getEntryName() + ":" + threadId,
getChannelName() + ":" + getLockName(threadId), commandExecutor.getConnectionManager());
}
@Override
protected RFuture<Void> acquireFailedAsync(long threadId) {
/* vergilyn mark: lua中數組下標是從1開始
* lindex key index : LIST, 返回列表中下標為指定索引值的元素. 如果指定索引值不在列表的區間范圍內, 返回 nil.
* zrange key start stop [WITHSCORES] : SORT-SET, 通過索引區間返回有序集合成指定區間內的成員.
* zincrby key increment member : SORT-SET, 有序集合中對指定成員的分數加上增量 increment.
* zrem key member [member ...] : SORT-SET, 移除有序集合中的一個或多個成員
* lrem key count value : LIST, 移除列表元素(count=0, 表示全部)
*/
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_VOID,
// KEYS[1]: "redisson_lock_queue:{xxx}", ARGV[1]: "{Redisson.UUID}:{threadId}"
// KEYS[2]: "redisson_lock_timeout:{xxx}", ARGV[2]: "{threadWaitTime}"(默認: 5000ms)
"local firstThreadId = redis.call('lindex', KEYS[1], 0); " +
"if firstThreadId == ARGV[1] then " +
"local keys = redis.call('zrange', KEYS[2], 0, -1); " +
"for i = 1, #keys, 1 do " +
"redis.call('zincrby', KEYS[2], -tonumber(ARGV[2]), keys[i]);" +
"end;" +
"end;" +
"redis.call('zrem', KEYS[2], ARGV[1]); " +
"redis.call('lrem', KEYS[1], 0, ARGV[1]); ",
Arrays.<Object>asList(getThreadsQueueName(), getTimeoutSetName()), // KEYS
getLockName(threadId), threadWaitTime); // PARAMS
}
/** vergilyn mark: <br/>
* FairLock核心方法, 嘗試獲取鎖(內部異步獲取);
* remark: 因為{Redisson.UUID}, 解決了服務器之間的時間不同步
* @param leaseTime 鎖自動釋放的時長
* @param unit leaseTime的時間單位
* @param threadId 線程
* @return
*/
@Override
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
long currentTime = System.currentTimeMillis();
if (command == RedisCommands.EVAL_NULL_BOOLEAN) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads: 移除陳舊的線程
// KEYS[1]: "lock_name", ARGV[1]: "{leaseTime}"
// KEYS[2]: "redisson_lock_queue:{xxx}", ARGV[2]: "{Redisson.UUID}:{threadId}"
// KEYS[3]: "redisson_lock_timeout:{xxx}", ARGV[3]: "{currentTime}" (部署服務器的時間ms, 不是redis-server的服務器時間)
"while true do "
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[3]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
+
"if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
"redis.call('lpop', KEYS[2]); " +
"redis.call('zrem', KEYS[3], ARGV[2]); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return 1;",
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName()), // KEYS
internalLockLeaseTime, getLockName(threadId), currentTime); // PARAMS
}
if (command == RedisCommands.EVAL_LONG) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// remove stale threads: 移除無效的競爭鎖的線程
// KEYS[1]: "lock_name", ARGV[1]: "{leaseTime}"
// KEYS[2]: "redisson_lock_queue:{xxx}", ARGV[2]: "{Redisson.UUID}:{threadId}"
// KEYS[3]: "redisson_lock_timeout:{xxx}", ARGV[3]: "{currentTime + threadWaitTime}" (部署服務器的時間ms, 不是redis-server的服務器時間)
// ARGV[4]: "{currentTime}"
// 移除競爭lock_name中無效的線程
"while true do "
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[4]) then " // 如果存在且已失效, 則相應的從"redisson_lock_queue:{xxx}"、"redisson_lock_timeout:{xxx}"中移除
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
// exists: 若 key 存在返回 1 ,否則返回 0 。
// 1. KEYS[1]: 記錄獲取到lock_name的線程, 及記錄線程獲取鎖的次數. (此HASH只會存在一個field)
// type: HASH, key: "lock_name", field: "{Redisson.UUID}:{threadId}", value: 1 (value表示線程獲取鎖的次數, 釋放鎖時必須釋放相同的次數, 才會釋放鎖lock_name)
// 2. 當不存在"redisson_lock_queue:{lock_name}"時, 表示沒有競爭對手存在; 或, 競爭隊列的第一個值為當前線程 (因為是FairLock, 所以要滿足 redis.call('lindex', KEYS[2], 0) == ARGV[2])
+ "if (redis.call('exists', KEYS[1]) == 0) and ((redis.call('exists', KEYS[2]) == 0) "
+ "or (redis.call('lindex', KEYS[2], 0) == ARGV[2])) then " +
// 當前線程獲取到鎖, 從queue、timeout中移除
"redis.call('lpop', KEYS[2]); " +
"redis.call('zrem', KEYS[3], ARGV[2]); " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " + // 標記當前鎖已被某個線程獲取
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 設置標記的失效時常, 默認是 30 * 1000 ms
"return nil; " +
"end; " +
// (重復獲取鎖) 當前線程即持有鎖的線程, 即可重入鎖(Reentrant Lock)
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + // 記錄線程獲取鎖的次數
"redis.call('pexpire', KEYS[1], ARGV[1]); " + // 重置失效時長
"return nil; " +
"end; " +
"local firstThreadId = redis.call('lindex', KEYS[2], 0); " +
"local ttl; " +
"if firstThreadId ~= false and firstThreadId ~= ARGV[2] then " + // 隊列存在, 且隊列的第一個值 ≠ 當前線程
"ttl = tonumber(redis.call('zscore', KEYS[3], firstThreadId)) - tonumber(ARGV[4]);" +
"else "
+ "ttl = redis.call('pttl', KEYS[1]);" +
"end; " +
"local timeout = ttl + tonumber(ARGV[3]);" + // 計算競爭線程的失效時間點
"if redis.call('zadd', KEYS[3], timeout, ARGV[2]) == 1 then " + // 設置當前線程競爭鎖的失效時間點
"redis.call('rpush', KEYS[2], ARGV[2]);" + // 將當前線程加入競爭鎖的隊列中
"end; " +
"return ttl;", // 返回持有鎖的剩余時長,
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName()),
internalLockLeaseTime, getLockName(threadId), currentTime + threadWaitTime, currentTime);
}
throw new IllegalArgumentException();
}
/** vergilyn mark: <br/>
* 釋放鎖(內部異步); 只有持有鎖的線程才能釋放鎖, 且線程獲取多少次鎖, 就要釋放多少次, 否則不會釋放(除非鎖達到失效時長).
* lua執行返回: 1, 成功釋放鎖; nil/0, 未釋放任何鎖.
* @see RedissonFairLock#forceUnlockAsync()
*/
@Override
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads: 移除無效的獲取鎖的線程
"while true do "
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[4]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
// 鎖已釋放(KEYS[1]已自動失效), 通知下一個獲取鎖的線程
+ "if (redis.call('exists', KEYS[1]) == 0) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " + // 成功釋放鎖
"end;" +
// 當前線程不是持有鎖的線程, 不允許釋放鎖
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 當前線程是持有鎖的線程, value: 遞減
// (ReentrantLock概念) 當同一線程中多次獲取鎖, 必須釋放相同多的次數, 才會最終釋放鎖
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " + // 每次遞減后重置失效時長
"return 0; " + // 釋放鎖失敗
"end; " +
// 當前線程所有獲取鎖的地方都釋放了鎖, 則正確的釋放鎖
"redis.call('del', KEYS[1]); " +
// 通知下一個競爭相同鎖的線程.
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; ", // 成功釋放鎖
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()),
LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId), System.currentTimeMillis());
}
@Override
public Condition newCondition() {
throw new UnsupportedOperationException();
}
@Override
public RFuture<Boolean> deleteAsync() {
return commandExecutor.writeAsync(getName(), RedisCommands.DEL_OBJECTS, getName(), getThreadsQueueName(), getTimeoutSetName());
}
/** vergilyn mark: <br/>
* 強制釋放鎖
* @see RedissonLock#unlockInnerAsync
*/
@Override
public RFuture<Boolean> forceUnlockAsync() {
cancelExpirationRenewal();
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// remove stale threads
"while true do "
+ "local firstThreadId2 = redis.call('lindex', KEYS[2], 0);"
+ "if firstThreadId2 == false then "
+ "break;"
+ "end; "
+ "local timeout = tonumber(redis.call('zscore', KEYS[3], firstThreadId2));"
+ "if timeout <= tonumber(ARGV[2]) then "
+ "redis.call('zrem', KEYS[3], firstThreadId2); "
+ "redis.call('lpop', KEYS[2]); "
+ "else "
+ "break;"
+ "end; "
+ "end;"
+
// 強制釋放鎖, 即使當前線程不是持有鎖的線程, 並通知下一個競爭相同鎖的線程
"if (redis.call('del', KEYS[1]) == 1) then " +
"local nextThreadId = redis.call('lindex', KEYS[2], 0); " +
"if nextThreadId ~= false then " +
"redis.call('publish', KEYS[4] .. ':' .. nextThreadId, ARGV[1]); " +
"end; " +
"return 1; " + // 強制釋放鎖成功
"end; " +
"return 0;", // 強制釋放鎖失敗
Arrays.<Object>asList(getName(), getThreadsQueueName(), getTimeoutSetName(), getChannelName()),
LockPubSub.unlockMessage, System.currentTimeMillis());
}
}
