前述:
相信很多小伙伴都知道,可以使用redis客戶端自帶的setnx方法來實現,但是,這個鎖設置多長時間合適呢?時間短了,可能請求還沒完成,鎖就失效了。那設置時間長點,多長合適呢?今天我們主要是講怎么避免這個問題,以及基於注解是怎么實現分布式鎖的。
開始之前,我先說明下實現的基本流程:
1、編寫springboot接入redis基本配置,以及相關工具類
2、新增分布式鎖的注解,並設置相關屬性
3、新增注解對應的切面,並實現分布式鎖的創建、校驗及刪除
4、新增分布式鎖的續期Job
5、新增測試類,便於測試觀察效果
流程圖如下:
代碼實現:(源碼地址:https://github.com/YhcAndHc/distributed-lock)
1、springboot接入redis,此步忽略,可以直接看我的源碼(我用的版本是2.4.1,redis使用lettuce客戶端)
2、新增分布式鎖的注解,可基於方法使用
@Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RedisLock { /** * 【必須】redis鎖 key的前綴 * * @return */ String key() default ""; /** * redis鎖 過期時間,默認60秒,最小不得低於10秒 * * @return */ long expire() default 60; /** * redis鎖 續期重試次數 * 每五秒掃描一次,若發現鎖剩余有效期低於10秒,且當前線程未執行完,則自動續期,續期時間為expire參數 * * @return */ int tryCount() default 3; }
3、針對注解做切面的邏輯處理(主要實現分布式鎖的周期)
@Pointcut("@annotation(com.yhc.distributedlock.annotation.RedisLock)") public void rlPointCut() { } // 考慮過用volatile關鍵字來實現線程中斷,但是每個請求線程之前的變量應該是獨享的,所以我這里還是考慮傳遞線程對象 // private volatile boolean threadInternalFlag = true; @Around("rlPointCut()") public Object redisLock(ProceedingJoinPoint pjp) throws Throwable { Object result; // 獲取注解對象 RedisLock redisLockAnnotation = getDeclaredAnnotation(pjp); String lockKey = redisLockAnnotation.key(); log.info("# [BEGIN]分布式鎖,創建key:{}", lockKey); try { boolean dlFlag = redisService.setnx(lockKey, UUID.randomUUID().toString(), redisLockAnnotation.expire()); if (!dlFlag) { throw new DlException("# 業務繁忙,請稍后重試"); } // 添加續期任務到JOB中 lockKeyList.add(new RedisLockInfo(lockKey, redisLockAnnotation.expire(), redisLockAnnotation.tryCount(), Thread.currentThread())); result = pjp.proceed(); redisService.delete(lockKey); log.info("# [END]分布式鎖,刪除key:{}", lockKey); return result; } catch (DlException de) { log.warn("# 分布式鎖,{}", de.getMsg()); } catch (InternalException ie) { log.error("# 分布式鎖,請求超時", ie.getMessage()); throw new Exception(ie.getMessage()); } catch (Exception e) { log.error("# 分布式鎖,創建失敗", e); redisService.delete(lockKey); log.info("# [END]分布式鎖,刪除key:{}", lockKey); } // 看到很多開發在這里清理key,但是你想想, // 上個請求仍在繼續,如果第二個請求進來,setnx肯定失敗,會進入finally刪除key,第三個請求是不是可以正常請求了呢 // finally { } return false; }
4、新建一個單獨的JOB,對未處理完的請求而且鎖臨近過期的鎖進行續期(注:這個是整個流程的核心)
private void jobRun() { Iterator<RedisLockInfo> it = lockKeyList.iterator(); while (it.hasNext()) { RedisLockInfo redisLockInfo = it.next(); String redisLockKey = redisLockInfo.getKey(); // 這里為何不是判斷key是否存在,是為了避免這里判斷存在之后和續期的中間時間,新的請求進來重新setnx,造成兩個請求同時存在 long ttl = redisService.getExpire(redisLockKey); if (ttl < RedisLockConts.KEY_TTL && ttl > 0) { int tryNumber = redisLockInfo.getTryNumber(); int tryCount = redisLockInfo.getTryCount(); if (tryNumber >= tryCount) { log.error("# thread interrupt"); // 續期次數已用完,直接中斷線程 it.remove(); Thread thread = redisLockInfo.getThread(); thread.interrupt(); continue; } boolean result = redisService.expire(redisLockKey, redisLockInfo.getExpireTime()); if (result) { tryNumber += 1; redisLockInfo.setTryNumber(tryNumber); log.info("# redis鎖[key:{}],檢查臨近過期,完成進行第{}次續期,{}次續期后將終止請求", redisLockKey, tryNumber, tryCount); } else { // 這里可能請求線程已經完成,所以續期失敗。 it.remove(); log.info("# redis鎖-EXPIRE[key:{}],請求已完成,無須續期", redisLockKey); } } else if (ttl == -2) { // -2表示key不存在 // 這里可能請求線程已經完成,所以續期失敗。 it.remove(); log.info("# redis鎖-TTL[key:{}],請求已完成,無須續期", redisLockKey); } } }
5、測試(測試過程請繼續往下看)
@RedisLock(key = "test", expire = 10)
@RequestMapping("/test")
public boolean test(@RequestParam String key, @RequestParam long threadSleepTime) {
log.info("# request begin");
try {
Thread.sleep(threadSleepTime);
return redisService.set(key, UUID.randomUUID().toString());
} catch (Exception e) {
log.error("# request fail , ", e);
}
log.info("# request end");
return false;
}
驗證過程(便於測試,我這里設置鎖的時間為10S,便於快速失效,大家測試可以根據實際情況調整):
測試場景一:瀏覽器輸入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=3000
結果分析一:這里會有兩種可能。請求3S完成,1、請求處理時,Job沒掃描到,等掃描到時,請求已經完成;2、請求處理時,Job掃描到了,進行續期一次,然后請求完成。
測試場景二:瀏覽器輸入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=11000
結果分析二:我們請求的線程休眠時間是11秒,而續期JOB是5S掃描一次,由於每次檢測到鎖的剩余時間小於10S(項目中定義的鎖臨近過期界限),所以會續期兩次,最后到11秒,請求完成,由切面清理鎖並返回。
測試場景三:瀏覽器輸入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000
結果解析三:在請求處理的60S內,一共續期三次,續期次數用完,請求還未完成,此時請求被JOB主動中斷,返回失敗。
,
測試場景四:瀏覽器輸入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000,之后打開多個窗口輸入 http://localhost:8088/dl/test?key=yhc&threadSleepTime=60000
結果解析四:以上三種主要是分了測試續期的邏輯,場景四是為了測試分布式鎖是否有生效。第一個請求會一直請求等待,后面的請求會直接失敗,返回false,從日志也可以看到,后面的請求獲取鎖都是失敗的。然后第一個請求,鎖的次數已續期三次,請求被JOB中斷,也返回失敗,說明分布式鎖生效了。
注意:此處注意不要使用Chrome瀏覽器進行驗證,這里建議大家使用firefox。Chrome瀏覽器同時只能對同一個URL發起一個請求,如果有更多的請求需要排隊。(這個問題排查了半天,一直以為是spring框架或thread.sleep()導致的)
至此完成,大家有問題歡迎留言討論,謝謝。