分布式锁(3) —— 分布式锁租约续期


Redis分布式锁在加锁的时候,我们一般都会给一个锁的过期时间(TTL),这是为了防止加锁后client宕机,锁无法被释放的问题。但是所有这种姿势的用法都会面临同一个问题,就是没发保证client的执行时间一定小于锁的TTL。虽然大多数程序员都会乐观的认为这种情况不可能发生,但是各种异常情况都会导致该问题的发生,比如网络延迟,jvm full gc。

Martin Kleppmann也质疑过这一点,这里直接用他的图:

  1. Client1获取到锁
  2. Client1开始任务,然后发生了STW的GC,时间超过了锁的过期时间
  3. Client2 获取到锁,开始了任务
  4. Client1的GC结束,继续任务,这个时候Client1和Client2都认为自己获取了锁,都会处理任务,从而发生错误。

如何解决呢?

可以给锁设置一个WatchDog自动给锁进行续期。实现的原理就是在加锁成功之后启动一个定时线程(WatchDog)自动给锁进行续期。

Redisson WatchDog机制

// org.redisson.RedissonLock#tryAcquireAsync()
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
  		  if (leaseTime != -1) {
            return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(
          			commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), 
          									TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
        // 异步获取结果,如果获取锁成功,则启动定时线程进行锁续约
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
                // 启动WatchDog
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
}
// org.redisson.RedissonLock#scheduleExpirationRenewal()
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = 
  																														new ConcurrentHashMap<>();
private void scheduleExpirationRenewal(long threadId) {
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    /**
    * 首先,会先判断在expirationRenewalMap中是否存在了entryName,
    * 这是个map结构,主要还是判断在这个服务实例中的加锁客户端的锁key是否存在,
    * 如果已经存在了,就直接返回;主要是考虑到RedissonLock是可重入锁。
    */
    if (oldEntry != null) {
        oldEntry.addThreadId(threadId);
    } else {
        entry.addThreadId(threadId);
        // 第一次加锁的时候会调用,内部会启动WatchDog
        renewExpiration();
    }
}
// org.redisson.RedissonLock#renewExpiration()
private void renewExpiration() {
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // renewExpirationAsync续约租期
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
      //每次间隔租期的1/3时间执行
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
    
    ee.setTimeout(task);
}
// org.redisson.RedissonLock#renewExpirationAsync()
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            // 续约internalLockLeaseTime(30s)            
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return 1; " +
            "end; " +
            "return 0;",
            Collections.singletonList(getName()),
            internalLockLeaseTime, getLockName(threadId));
}

WatchDog其实就是一个周期任务,给出一个更加简单的例子:

private void startWatchdog(String lockName, String lockValue, int expiredTime) {
    String cronExpr = String.format("0/%s * * * * *", expiredTime / 3 > 60 ? 60 : expiredTime / 3);
    ExpirationEntry expirationEntry = new ExpirationEntry(lockName, lockValue);
    ScheduleExecutor.getInstance().addTask(new ScheduleLockWatchdogTask(expirationEntry,
                  new ScheduleCronTask(cronExpr, () -> {
            					if (isStillLock(lockName, lockValue)) {
                			jedisCluster.expire(lockName, expiredTime);
            					} else {
                				expirationEntry.setStillLock(false);
            					}
    }))).start();
}

public class ScheduleLockWatchdogTask implements ScheduleTask {
    private ExpirationEntry expirationEntry;
    private ScheduleTask cronTask;

    public ScheduleLockWatchdogTask(ExpirationEntry expirationEntry, ScheduleTask cronTask) {
        this.expirationEntry = expirationEntry;
        this.cronTask = cronTask;
    }

    @Override
    public void executeTask() {
        cronTask.executeTask();
    }
}


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM