基于注解的分布式锁实现


前述:

相信很多小伙伴都知道,可以使用redis客户端自带的setnx方法来实现,但是,这个锁设置多长时间合适呢?时间短了,可能请求还没完成,锁就失效了。那设置时间长点,多长合适呢?今天我们主要是讲怎么避免这个问题,以及基于注解是怎么实现分布式锁的。

 

开始之前,我先说明下实现的基本流程:

1、编写springboot接入redis基本配置,以及相关工具类

2、新增分布式锁的注解,并设置相关属性

3、新增注解对应的切面,并实现分布式锁的创建、校验及删除

4、新增分布式锁的续期Job

5、新增测试类,便于测试观察效果

 

流程图如下:

 代码实现:(源码地址:https://github.com/YhcAndHc/distributed-lock

  1、springboot接入redis,此步忽略,可以直接看我的源码(我用的版本是2.4.1,redis使用lettuce客户端)

  2、新增分布式锁的注解,可基于方法使用 

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLock {

    /**
     * 【必须】redis锁 key的前缀
     *
     * @return
     */
    String key() default "";

    /**
     * redis锁 过期时间,默认60秒,最小不得低于10秒
     *
     * @return
     */
    long expire() default 60;

    /**
     * redis锁 续期重试次数
     * 每五秒扫描一次,若发现锁剩余有效期低于10秒,且当前线程未执行完,则自动续期,续期时间为expire参数
     *
     * @return
     */
    int tryCount() default 3;
}

   

  3、针对注解做切面的逻辑处理(主要实现分布式锁的周期)

    @Pointcut("@annotation(com.yhc.distributedlock.annotation.RedisLock)")
    public void rlPointCut() {
    }

    // 考虑过用volatile关键字来实现线程中断,但是每个请求线程之前的变量应该是独享的,所以我这里还是考虑传递线程对象
//    private volatile boolean threadInternalFlag = true;

    @Around("rlPointCut()")
    public Object redisLock(ProceedingJoinPoint pjp) throws Throwable {
        Object result;

        // 获取注解对象
        RedisLock redisLockAnnotation = getDeclaredAnnotation(pjp);
        String lockKey = redisLockAnnotation.key();
        log.info("# [BEGIN]分布式锁,创建key:{}", lockKey);
        try {
            boolean dlFlag = redisService.setnx(lockKey, UUID.randomUUID().toString(), redisLockAnnotation.expire());
            if (!dlFlag) {
                throw new DlException("# 业务繁忙,请稍后重试");
            }
            // 添加续期任务到JOB中
            lockKeyList.add(new RedisLockInfo(lockKey, redisLockAnnotation.expire(), redisLockAnnotation.tryCount(), Thread.currentThread()));
            result = pjp.proceed();
            redisService.delete(lockKey);
            log.info("# [END]分布式锁,删除key:{}", lockKey);
            return result;
        } catch (DlException de) {
            log.warn("# 分布式锁,{}", de.getMsg());
        } catch (InternalException ie) {
            log.error("# 分布式锁,请求超时", ie.getMessage());
            throw new Exception(ie.getMessage());
        } catch (Exception e) {
            log.error("# 分布式锁,创建失败", e);
            redisService.delete(lockKey);
            log.info("# [END]分布式锁,删除key:{}", lockKey);
        }
        // 看到很多开发在这里清理key,但是你想想,
        // 上个请求仍在继续,如果第二个请求进来,setnx肯定失败,会进入finally删除key,第三个请求是不是可以正常请求了呢
        // finally { }
        return false;
    }

 

  4、新建一个单独的JOB,对未处理完的请求而且锁临近过期的锁进行续期(注:这个是整个流程的核心

    private void jobRun() {
        Iterator<RedisLockInfo> it = lockKeyList.iterator();
        while (it.hasNext()) {
            RedisLockInfo redisLockInfo = it.next();
            String redisLockKey = redisLockInfo.getKey();

            // 这里为何不是判断key是否存在,是为了避免这里判断存在之后和续期的中间时间,新的请求进来重新setnx,造成两个请求同时存在
            long ttl = redisService.getExpire(redisLockKey);
            if (ttl < RedisLockConts.KEY_TTL && ttl > 0) {
                int tryNumber = redisLockInfo.getTryNumber();
                int tryCount = redisLockInfo.getTryCount();
                if (tryNumber >= tryCount) {
                    log.error("# thread interrupt");
                    // 续期次数已用完,直接中断线程
                    it.remove();
                    Thread thread = redisLockInfo.getThread();
                    thread.interrupt();
                    continue;
                }

                boolean result = redisService.expire(redisLockKey, redisLockInfo.getExpireTime());
                if (result) {
                    tryNumber += 1;
                    redisLockInfo.setTryNumber(tryNumber);
                    log.info("# redis锁[key:{}],检查临近过期,完成进行第{}次续期,{}次续期后将终止请求", redisLockKey, tryNumber, tryCount);
                } else {
                    // 这里可能请求线程已经完成,所以续期失败。
                    it.remove();
                    log.info("# redis锁-EXPIRE[key:{}],请求已完成,无须续期", redisLockKey);
                }

            } else if (ttl == -2) { // -2表示key不存在
                // 这里可能请求线程已经完成,所以续期失败。
                it.remove();
                log.info("# redis锁-TTL[key:{}],请求已完成,无须续期", redisLockKey);
            }
        }
    }

  

  5、测试(测试过程请继续往下看)


@RedisLock(key = "test", expire = 10)
@RequestMapping("/test")
public boolean test(@RequestParam String key, @RequestParam long threadSleepTime) {
log.info("# request begin");
try {
Thread.sleep(threadSleepTime);
return redisService.set(key, UUID.randomUUID().toString());
} catch (Exception e) {
log.error("# request fail , ", e);
}
log.info("# request end");
return false;
}

 

验证过程(便于测试,我这里设置锁的时间为10S,便于快速失效,大家测试可以根据实际情况调整)

  测试场景一:浏览器输入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=3000

  结果分析一:这里会有两种可能。请求3S完成,1、请求处理时,Job没扫描到,等扫描到时,请求已经完成;2、请求处理时,Job扫描到了,进行续期一次,然后请求完成。

  

  

  测试场景二:浏览器输入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=11000

  结果分析二:我们请求的线程休眠时间是11秒,而续期JOB是5S扫描一次,由于每次检测到锁的剩余时间小于10S(项目中定义的锁临近过期界限),所以会续期两次,最后到11秒,请求完成,由切面清理锁并返回。

  

  

  测试场景三:浏览器输入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000

  结果解析三:在请求处理的60S内,一共续期三次,续期次数用完,请求还未完成,此时请求被JOB主动中断,返回失败。

,  

 

   

  测试场景四:浏览器输入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000,之后打开多个窗口输入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000

  结果解析四:以上三种主要是分了测试续期的逻辑,场景四是为了测试分布式锁是否有生效。第一个请求会一直请求等待,后面的请求会直接失败,返回false,从日志也可以看到,后面的请求获取锁都是失败的。然后第一个请求,锁的次数已续期三次,请求被JOB中断,也返回失败,说明分布式锁生效了。

  注意:此处注意不要使用Chrome浏览器进行验证,这里建议大家使用firefox。Chrome浏览器同时只能对同一个URL发起一个请求,如果有更多的请求需要排队。(这个问题排查了半天,一直以为是spring框架或thread.sleep()导致的)

  

 

 

 

 

至此完成,大家有问题欢迎留言讨论,谢谢。

 


					


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM