Redis分布式鎖在加鎖的時候,我們一般都會給一個鎖的過期時間(TTL),這是為了防止加鎖后client宕機,鎖無法被釋放的問題。但是所有這種姿勢的用法都會面臨同一個問題,就是沒發保證client的執行時間一定小於鎖的TTL。雖然大多數程序員都會樂觀的認為這種情況不可能發生,但是各種異常情況都會導致該問題的發生,比如網絡延遲,jvm full gc。
Martin Kleppmann也質疑過這一點,這里直接用他的圖:
- Client1獲取到鎖
- Client1開始任務,然后發生了STW的GC,時間超過了鎖的過期時間
- Client2 獲取到鎖,開始了任務
- Client1的GC結束,繼續任務,這個時候Client1和Client2都認為自己獲取了鎖,都會處理任務,從而發生錯誤。
如何解決呢?
可以給鎖設置一個WatchDog自動給鎖進行續期。實現的原理就是在加鎖成功之后啟動一個定時線程(WatchDog)自動給鎖進行續期。
Redisson WatchDog機制
// org.redisson.RedissonLock#tryAcquireAsync()
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 異步獲取結果,如果獲取鎖成功,則啟動定時線程進行鎖續約
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
// 啟動WatchDog
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
// org.redisson.RedissonLock#scheduleExpirationRenewal()
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP =
new ConcurrentHashMap<>();
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
/**
* 首先,會先判斷在expirationRenewalMap中是否存在了entryName,
* 這是個map結構,主要還是判斷在這個服務實例中的加鎖客戶端的鎖key是否存在,
* 如果已經存在了,就直接返回;主要是考慮到RedissonLock是可重入鎖。
*/
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 第一次加鎖的時候會調用,內部會啟動WatchDog
renewExpiration();
}
}
// org.redisson.RedissonLock#renewExpiration()
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// renewExpirationAsync續約租期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
//每次間隔租期的1/3時間執行
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
// org.redisson.RedissonLock#renewExpirationAsync()
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 續約internalLockLeaseTime(30s)
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
WatchDog其實就是一個周期任務,給出一個更加簡單的例子:
private void startWatchdog(String lockName, String lockValue, int expiredTime) {
String cronExpr = String.format("0/%s * * * * *", expiredTime / 3 > 60 ? 60 : expiredTime / 3);
ExpirationEntry expirationEntry = new ExpirationEntry(lockName, lockValue);
ScheduleExecutor.getInstance().addTask(new ScheduleLockWatchdogTask(expirationEntry,
new ScheduleCronTask(cronExpr, () -> {
if (isStillLock(lockName, lockValue)) {
jedisCluster.expire(lockName, expiredTime);
} else {
expirationEntry.setStillLock(false);
}
}))).start();
}
public class ScheduleLockWatchdogTask implements ScheduleTask {
private ExpirationEntry expirationEntry;
private ScheduleTask cronTask;
public ScheduleLockWatchdogTask(ExpirationEntry expirationEntry, ScheduleTask cronTask) {
this.expirationEntry = expirationEntry;
this.cronTask = cronTask;
}
@Override
public void executeTask() {
cronTask.executeTask();
}
}