基於注解的分布式鎖實現


前述:

相信很多小伙伴都知道,可以使用redis客戶端自帶的setnx方法來實現,但是,這個鎖設置多長時間合適呢?時間短了,可能請求還沒完成,鎖就失效了。那設置時間長點,多長合適呢?今天我們主要是講怎么避免這個問題,以及基於注解是怎么實現分布式鎖的。

 

開始之前,我先說明下實現的基本流程:

1、編寫springboot接入redis基本配置,以及相關工具類

2、新增分布式鎖的注解,並設置相關屬性

3、新增注解對應的切面,並實現分布式鎖的創建、校驗及刪除

4、新增分布式鎖的續期Job

5、新增測試類,便於測試觀察效果

 

流程圖如下:

 代碼實現:(源碼地址:https://github.com/YhcAndHc/distributed-lock

  1、springboot接入redis,此步忽略,可以直接看我的源碼(我用的版本是2.4.1,redis使用lettuce客戶端)

  2、新增分布式鎖的注解,可基於方法使用 

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisLock {

    /**
     * 【必須】redis鎖 key的前綴
     *
     * @return
     */
    String key() default "";

    /**
     * redis鎖 過期時間,默認60秒,最小不得低於10秒
     *
     * @return
     */
    long expire() default 60;

    /**
     * redis鎖 續期重試次數
     * 每五秒掃描一次,若發現鎖剩余有效期低於10秒,且當前線程未執行完,則自動續期,續期時間為expire參數
     *
     * @return
     */
    int tryCount() default 3;
}

   

  3、針對注解做切面的邏輯處理(主要實現分布式鎖的周期)

    @Pointcut("@annotation(com.yhc.distributedlock.annotation.RedisLock)")
    public void rlPointCut() {
    }

    // 考慮過用volatile關鍵字來實現線程中斷,但是每個請求線程之前的變量應該是獨享的,所以我這里還是考慮傳遞線程對象
//    private volatile boolean threadInternalFlag = true;

    @Around("rlPointCut()")
    public Object redisLock(ProceedingJoinPoint pjp) throws Throwable {
        Object result;

        // 獲取注解對象
        RedisLock redisLockAnnotation = getDeclaredAnnotation(pjp);
        String lockKey = redisLockAnnotation.key();
        log.info("# [BEGIN]分布式鎖,創建key:{}", lockKey);
        try {
            boolean dlFlag = redisService.setnx(lockKey, UUID.randomUUID().toString(), redisLockAnnotation.expire());
            if (!dlFlag) {
                throw new DlException("# 業務繁忙,請稍后重試");
            }
            // 添加續期任務到JOB中
            lockKeyList.add(new RedisLockInfo(lockKey, redisLockAnnotation.expire(), redisLockAnnotation.tryCount(), Thread.currentThread()));
            result = pjp.proceed();
            redisService.delete(lockKey);
            log.info("# [END]分布式鎖,刪除key:{}", lockKey);
            return result;
        } catch (DlException de) {
            log.warn("# 分布式鎖,{}", de.getMsg());
        } catch (InternalException ie) {
            log.error("# 分布式鎖,請求超時", ie.getMessage());
            throw new Exception(ie.getMessage());
        } catch (Exception e) {
            log.error("# 分布式鎖,創建失敗", e);
            redisService.delete(lockKey);
            log.info("# [END]分布式鎖,刪除key:{}", lockKey);
        }
        // 看到很多開發在這里清理key,但是你想想,
        // 上個請求仍在繼續,如果第二個請求進來,setnx肯定失敗,會進入finally刪除key,第三個請求是不是可以正常請求了呢
        // finally { }
        return false;
    }

 

  4、新建一個單獨的JOB,對未處理完的請求而且鎖臨近過期的鎖進行續期(注:這個是整個流程的核心

    private void jobRun() {
        Iterator<RedisLockInfo> it = lockKeyList.iterator();
        while (it.hasNext()) {
            RedisLockInfo redisLockInfo = it.next();
            String redisLockKey = redisLockInfo.getKey();

            // 這里為何不是判斷key是否存在,是為了避免這里判斷存在之后和續期的中間時間,新的請求進來重新setnx,造成兩個請求同時存在
            long ttl = redisService.getExpire(redisLockKey);
            if (ttl < RedisLockConts.KEY_TTL && ttl > 0) {
                int tryNumber = redisLockInfo.getTryNumber();
                int tryCount = redisLockInfo.getTryCount();
                if (tryNumber >= tryCount) {
                    log.error("# thread interrupt");
                    // 續期次數已用完,直接中斷線程
                    it.remove();
                    Thread thread = redisLockInfo.getThread();
                    thread.interrupt();
                    continue;
                }

                boolean result = redisService.expire(redisLockKey, redisLockInfo.getExpireTime());
                if (result) {
                    tryNumber += 1;
                    redisLockInfo.setTryNumber(tryNumber);
                    log.info("# redis鎖[key:{}],檢查臨近過期,完成進行第{}次續期,{}次續期后將終止請求", redisLockKey, tryNumber, tryCount);
                } else {
                    // 這里可能請求線程已經完成,所以續期失敗。
                    it.remove();
                    log.info("# redis鎖-EXPIRE[key:{}],請求已完成,無須續期", redisLockKey);
                }

            } else if (ttl == -2) { // -2表示key不存在
                // 這里可能請求線程已經完成,所以續期失敗。
                it.remove();
                log.info("# redis鎖-TTL[key:{}],請求已完成,無須續期", redisLockKey);
            }
        }
    }

  

  5、測試(測試過程請繼續往下看)


@RedisLock(key = "test", expire = 10)
@RequestMapping("/test")
public boolean test(@RequestParam String key, @RequestParam long threadSleepTime) {
log.info("# request begin");
try {
Thread.sleep(threadSleepTime);
return redisService.set(key, UUID.randomUUID().toString());
} catch (Exception e) {
log.error("# request fail , ", e);
}
log.info("# request end");
return false;
}

 

驗證過程(便於測試,我這里設置鎖的時間為10S,便於快速失效,大家測試可以根據實際情況調整)

  測試場景一:瀏覽器輸入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=3000

  結果分析一:這里會有兩種可能。請求3S完成,1、請求處理時,Job沒掃描到,等掃描到時,請求已經完成;2、請求處理時,Job掃描到了,進行續期一次,然后請求完成。

  

  

  測試場景二:瀏覽器輸入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=11000

  結果分析二:我們請求的線程休眠時間是11秒,而續期JOB是5S掃描一次,由於每次檢測到鎖的剩余時間小於10S(項目中定義的鎖臨近過期界限),所以會續期兩次,最后到11秒,請求完成,由切面清理鎖並返回。

  

  

  測試場景三:瀏覽器輸入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000

  結果解析三:在請求處理的60S內,一共續期三次,續期次數用完,請求還未完成,此時請求被JOB主動中斷,返回失敗。

,  

 

   

  測試場景四:瀏覽器輸入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000,之后打開多個窗口輸入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000

  結果解析四:以上三種主要是分了測試續期的邏輯,場景四是為了測試分布式鎖是否有生效。第一個請求會一直請求等待,后面的請求會直接失敗,返回false,從日志也可以看到,后面的請求獲取鎖都是失敗的。然后第一個請求,鎖的次數已續期三次,請求被JOB中斷,也返回失敗,說明分布式鎖生效了。

  注意:此處注意不要使用Chrome瀏覽器進行驗證,這里建議大家使用firefox。Chrome瀏覽器同時只能對同一個URL發起一個請求,如果有更多的請求需要排隊。(這個問題排查了半天,一直以為是spring框架或thread.sleep()導致的)

  

 

 

 

 

至此完成,大家有問題歡迎留言討論,謝謝。

 

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM