Redis分布式锁在加锁的时候,我们一般都会给一个锁的过期时间(TTL),这是为了防止加锁后client宕机,锁无法被释放的问题。但是所有这种姿势的用法都会面临同一个问题,就是没发保证client的执行时间一定小于锁的TTL。虽然大多数程序员都会乐观的认为这种情况不可能发生,但是各种异常情况都会导致该问题的发生,比如网络延迟,jvm full gc。
Martin Kleppmann也质疑过这一点,这里直接用他的图:
- Client1获取到锁
- Client1开始任务,然后发生了STW的GC,时间超过了锁的过期时间
- Client2 获取到锁,开始了任务
- Client1的GC结束,继续任务,这个时候Client1和Client2都认为自己获取了锁,都会处理任务,从而发生错误。
如何解决呢?
可以给锁设置一个WatchDog自动给锁进行续期。实现的原理就是在加锁成功之后启动一个定时线程(WatchDog)自动给锁进行续期。
Redisson WatchDog机制
// org.redisson.RedissonLock#tryAcquireAsync()
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
if (leaseTime != -1) {
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
// 异步获取结果,如果获取锁成功,则启动定时线程进行锁续约
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
// 启动WatchDog
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
// org.redisson.RedissonLock#scheduleExpirationRenewal()
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP =
new ConcurrentHashMap<>();
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
/**
* 首先,会先判断在expirationRenewalMap中是否存在了entryName,
* 这是个map结构,主要还是判断在这个服务实例中的加锁客户端的锁key是否存在,
* 如果已经存在了,就直接返回;主要是考虑到RedissonLock是可重入锁。
*/
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
// 第一次加锁的时候会调用,内部会启动WatchDog
renewExpiration();
}
}
// org.redisson.RedissonLock#renewExpiration()
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
// renewExpirationAsync续约租期
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
renewExpiration();
}
});
}
//每次间隔租期的1/3时间执行
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
// org.redisson.RedissonLock#renewExpirationAsync()
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
// 续约internalLockLeaseTime(30s)
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
WatchDog其实就是一个周期任务,给出一个更加简单的例子:
private void startWatchdog(String lockName, String lockValue, int expiredTime) {
String cronExpr = String.format("0/%s * * * * *", expiredTime / 3 > 60 ? 60 : expiredTime / 3);
ExpirationEntry expirationEntry = new ExpirationEntry(lockName, lockValue);
ScheduleExecutor.getInstance().addTask(new ScheduleLockWatchdogTask(expirationEntry,
new ScheduleCronTask(cronExpr, () -> {
if (isStillLock(lockName, lockValue)) {
jedisCluster.expire(lockName, expiredTime);
} else {
expirationEntry.setStillLock(false);
}
}))).start();
}
public class ScheduleLockWatchdogTask implements ScheduleTask {
private ExpirationEntry expirationEntry;
private ScheduleTask cronTask;
public ScheduleLockWatchdogTask(ExpirationEntry expirationEntry, ScheduleTask cronTask) {
this.expirationEntry = expirationEntry;
this.cronTask = cronTask;
}
@Override
public void executeTask() {
cronTask.executeTask();
}
}