前述:
相信很多小伙伴都知道,可以使用redis客户端自带的setnx方法来实现,但是,这个锁设置多长时间合适呢?时间短了,可能请求还没完成,锁就失效了。那设置时间长点,多长合适呢?今天我们主要是讲怎么避免这个问题,以及基于注解是怎么实现分布式锁的。
开始之前,我先说明下实现的基本流程:
1、编写springboot接入redis基本配置,以及相关工具类
2、新增分布式锁的注解,并设置相关属性
3、新增注解对应的切面,并实现分布式锁的创建、校验及删除
4、新增分布式锁的续期Job
5、新增测试类,便于测试观察效果
流程图如下:
代码实现:(源码地址:https://github.com/YhcAndHc/distributed-lock)
1、springboot接入redis,此步忽略,可以直接看我的源码(我用的版本是2.4.1,redis使用lettuce客户端)
2、新增分布式锁的注解,可基于方法使用
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RedisLock { /** * 【必须】redis锁 key的前缀 * * @return */ String key() default ""; /** * redis锁 过期时间,默认60秒,最小不得低于10秒 * * @return */ long expire() default 60; /** * redis锁 续期重试次数 * 每五秒扫描一次,若发现锁剩余有效期低于10秒,且当前线程未执行完,则自动续期,续期时间为expire参数 * * @return */ int tryCount() default 3; }
3、针对注解做切面的逻辑处理(主要实现分布式锁的周期)
@Pointcut("@annotation(com.yhc.distributedlock.annotation.RedisLock)") public void rlPointCut() { } // 考虑过用volatile关键字来实现线程中断,但是每个请求线程之前的变量应该是独享的,所以我这里还是考虑传递线程对象 // private volatile boolean threadInternalFlag = true; @Around("rlPointCut()") public Object redisLock(ProceedingJoinPoint pjp) throws Throwable { Object result; // 获取注解对象 RedisLock redisLockAnnotation = getDeclaredAnnotation(pjp); String lockKey = redisLockAnnotation.key(); log.info("# [BEGIN]分布式锁,创建key:{}", lockKey); try { boolean dlFlag = redisService.setnx(lockKey, UUID.randomUUID().toString(), redisLockAnnotation.expire()); if (!dlFlag) { throw new DlException("# 业务繁忙,请稍后重试"); } // 添加续期任务到JOB中 lockKeyList.add(new RedisLockInfo(lockKey, redisLockAnnotation.expire(), redisLockAnnotation.tryCount(), Thread.currentThread())); result = pjp.proceed(); redisService.delete(lockKey); log.info("# [END]分布式锁,删除key:{}", lockKey); return result; } catch (DlException de) { log.warn("# 分布式锁,{}", de.getMsg()); } catch (InternalException ie) { log.error("# 分布式锁,请求超时", ie.getMessage()); throw new Exception(ie.getMessage()); } catch (Exception e) { log.error("# 分布式锁,创建失败", e); redisService.delete(lockKey); log.info("# [END]分布式锁,删除key:{}", lockKey); } // 看到很多开发在这里清理key,但是你想想, // 上个请求仍在继续,如果第二个请求进来,setnx肯定失败,会进入finally删除key,第三个请求是不是可以正常请求了呢 // finally { } return false; }
4、新建一个单独的JOB,对未处理完的请求而且锁临近过期的锁进行续期(注:这个是整个流程的核心)
private void jobRun() { Iterator<RedisLockInfo> it = lockKeyList.iterator(); while (it.hasNext()) { RedisLockInfo redisLockInfo = it.next(); String redisLockKey = redisLockInfo.getKey(); // 这里为何不是判断key是否存在,是为了避免这里判断存在之后和续期的中间时间,新的请求进来重新setnx,造成两个请求同时存在 long ttl = redisService.getExpire(redisLockKey); if (ttl < RedisLockConts.KEY_TTL && ttl > 0) { int tryNumber = redisLockInfo.getTryNumber(); int tryCount = redisLockInfo.getTryCount(); if (tryNumber >= tryCount) { log.error("# thread interrupt"); // 续期次数已用完,直接中断线程 it.remove(); Thread thread = redisLockInfo.getThread(); thread.interrupt(); continue; } boolean result = redisService.expire(redisLockKey, redisLockInfo.getExpireTime()); if (result) { tryNumber += 1; redisLockInfo.setTryNumber(tryNumber); log.info("# redis锁[key:{}],检查临近过期,完成进行第{}次续期,{}次续期后将终止请求", redisLockKey, tryNumber, tryCount); } else { // 这里可能请求线程已经完成,所以续期失败。 it.remove(); log.info("# redis锁-EXPIRE[key:{}],请求已完成,无须续期", redisLockKey); } } else if (ttl == -2) { // -2表示key不存在 // 这里可能请求线程已经完成,所以续期失败。 it.remove(); log.info("# redis锁-TTL[key:{}],请求已完成,无须续期", redisLockKey); } } }
5、测试(测试过程请继续往下看)
@RedisLock(key = "test", expire = 10)
@RequestMapping("/test")
public boolean test(@RequestParam String key, @RequestParam long threadSleepTime) {
log.info("# request begin");
try {
Thread.sleep(threadSleepTime);
return redisService.set(key, UUID.randomUUID().toString());
} catch (Exception e) {
log.error("# request fail , ", e);
}
log.info("# request end");
return false;
}
验证过程(便于测试,我这里设置锁的时间为10S,便于快速失效,大家测试可以根据实际情况调整):
测试场景一:浏览器输入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=3000
结果分析一:这里会有两种可能。请求3S完成,1、请求处理时,Job没扫描到,等扫描到时,请求已经完成;2、请求处理时,Job扫描到了,进行续期一次,然后请求完成。
测试场景二:浏览器输入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=11000
结果分析二:我们请求的线程休眠时间是11秒,而续期JOB是5S扫描一次,由于每次检测到锁的剩余时间小于10S(项目中定义的锁临近过期界限),所以会续期两次,最后到11秒,请求完成,由切面清理锁并返回。
测试场景三:浏览器输入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000
结果解析三:在请求处理的60S内,一共续期三次,续期次数用完,请求还未完成,此时请求被JOB主动中断,返回失败。
,
测试场景四:浏览器输入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000,之后打开多个窗口输入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000
结果解析四:以上三种主要是分了测试续期的逻辑,场景四是为了测试分布式锁是否有生效。第一个请求会一直请求等待,后面的请求会直接失败,返回false,从日志也可以看到,后面的请求获取锁都是失败的。然后第一个请求,锁的次数已续期三次,请求被JOB中断,也返回失败,说明分布式锁生效了。
注意:此处注意不要使用Chrome浏览器进行验证,这里建议大家使用firefox。Chrome浏览器同时只能对同一个URL发起一个请求,如果有更多的请求需要排队。(这个问题排查了半天,一直以为是spring框架或thread.sleep()导致的)
至此完成,大家有问题欢迎留言讨论,谢谢。