介紹與配置
Redisson官方文檔:https://github.com/redisson/redisson/wiki/Redisson%E9%A1%B9%E7%9B%AE%E4%BB%8B%E7%BB%8D
Springboot 自動配置類: RedissonAutoConfiguration
pom配置:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> </dependency> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson-spring-boot-starter</artifactId> <version>3.13.6</version> </dependency>
如果什么都不配置的話,會默認使用單Redis節點模式,代碼中直接就可以使用 RedissonClient
具體配置可參考官方文檔:https://github.com/redisson/redisson/wiki/2.-%E9%85%8D%E7%BD%AE%E6%96%B9%E6%B3%95
分布式鎖測試
@Slf4j @SpringBootTest(classes = DemoWebApplication.class) public class RedissonTest { @Resource private RedissonClient redissonClient; @Test public void redissonTest() throws InterruptedException { log.info("===redissonTest====start==============="); for (int i = 0; i < 10; i++) { new Thread(() -> { lockTest(); }).start(); } Thread.sleep(30000); log.info("===redissonTest====end==============="); }
private void tryLockTest() {
// 見下文
}
private void lockTest() {
// 見下文
}
}
private void tryLockTest() { String threadName = Thread.currentThread().getName(); log.info("===Thread=={}===start===", threadName); RLock lock = redisson.getLock("DistributedRedisLockTest"); // 嘗試加鎖,最多等待10秒,上鎖以后30秒自動解鎖 boolean lockFlag = false; try { // 嘗試去加鎖,10秒沒獲取到鎖,則返回false // res = lock.tryLock(10, TimeUnit.SECONDS); // 嘗試去加鎖,10秒沒獲取到鎖,則返回false,獲取到則返回true,獲取到鎖后30秒自動釋放 // 當waitTime設置為0時,就相當於setNx,獲取不到鎖直接退出 lockFlag = lock.tryLock(5, 1, TimeUnit.SECONDS); if (!lockFlag) { log.info("===Thread=={}==res={}==沒有獲取到鎖,退出===", threadName, lockFlag); return; } log.info("===Thread=={}============getLock===", threadName); // 模擬業務邏輯 Thread.sleep(2000); } catch (Exception e) { log.error("執行異常,e:{}", ExceptionUtils.getStackTrace(e)); } finally { log.info("===Thread=={}==========isHeldByCurrentThread={}", threadName, lock.isHeldByCurrentThread()); // 釋放鎖也可能出現異常,比如業務代碼沒執行完,鎖就過期,此時進行釋放會拋異常,加個當前線程是否持有所的判斷 if (lock.isHeldByCurrentThread()) { lock.unlock(); } } log.info("===Thread=={}==lockFlag={}=end===", threadName, lockFlag); }
private void lockTest() { String threadName = Thread.currentThread().getName(); log.info("===Thread=={}===start===", threadName); RLock lock = redisson.getLock("DistributedRedisLockTest"); // lock表示去加鎖,加鎖成功,沒有返回值,繼續執行下面代碼;加鎖失敗,它會一直阻塞,直到鎖被釋放,再繼續往下執行 // lock.lock(); // 1秒自動釋放時間,但是后續執行unlock操作時會報錯(自己只能解鎖自己的,第一個線程釋放之后執行到unlock方法,但是此時鎖已經是第二個線程的了) lock.lock(1, TimeUnit.SECONDS); log.info("===Thread=={}============getLock===", threadName); try { Thread.sleep(2000); } catch (Exception e) { log.error("執行異常,e:{}", ExceptionUtils.getStackTrace(e)); } finally { log.info("===Thread=={}==========isHeldByCurrentThread={}", threadName, lock.isHeldByCurrentThread()); // 釋放鎖也可能出現異常,比如業務代碼沒執行完,鎖就過期,此時進行釋放會拋異常,加個當前線程是否持有所的判斷 if (lock.isHeldByCurrentThread()) { lock.unlock(); } } log.info("===Thread=={}===end===", threadName); }
原理
本文中Redisson版本為 redisson-spring-boot-starter 3.13.6
先看下接口方法:
public interface RRLock extends Lock, RLockAsync{ //----------------------Lock接口方法----------------------- /** * 加鎖 鎖的有效期默認30秒 */ void lock(); /** * tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他線程獲取),則返回false . */ boolean tryLock(); /** * tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,只不過區別在於這個方法在拿不到鎖時會等待一定的時間, * 在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。 * * @param time 等待時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /** * 解鎖 */ void unlock(); /** * 中斷鎖 表示該鎖可以被中斷 假如A和B同時調這個方法,A獲取鎖,B為獲取鎖,那么B線程可以通過 * Thread.currentThread().interrupt(); 方法真正中斷該線程 */ void lockInterruptibly(); //----------------------RLock接口方法----------------------- /** * 加鎖 上面是默認30秒這里可以手動設置鎖的有效時間 * * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ void lock(long leaseTime, TimeUnit unit); /** * 這里比上面多一個參數,多添加一個鎖的有效時間 * * @param waitTime 等待時間 * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; /** * 檢驗該鎖是否被線程使用,如果被使用返回True */ boolean isLocked(); /** * 檢查當前線程是否獲得此鎖(這個和上面的區別就是該方法可以判斷是否當前線程獲得此鎖,而不是此鎖是否被線程占有) * 這個比上面那個實用 */ boolean isHeldByCurrentThread(); /** * 中斷鎖 和上面中斷鎖差不多,只是這里如果獲得鎖成功,添加鎖的有效時間 * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ void lockInterruptibly(long leaseTime, TimeUnit unit); }
-
tryLock方法
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
// 1.嘗試申請鎖,返回還剩余的鎖過期時間
Long ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 2.如果為空,表示申請鎖成功, 返回true
if (ttl == null) {
return true;
}
// 3.申請鎖的耗時如果大於等於最大等待時間,則申請鎖失敗
time -= System.currentTimeMillis() - current;
if (time <= 0) {
// 通過 promise.trySuccess 設置異步執行的結果為null
// Promise從Uncompleted-->Completed ,通知 Future 異步執行已完成
acquireFailed(waitTime, unit, threadId);
return false;
}
current = System.currentTimeMillis();
/**
* 4.訂閱鎖釋放事件,並通過await方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題:
* 基於信息量,當鎖被其它資源占用時,當前線程通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會發消息通知待等待的線程進行競爭
* 當 this.await返回false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱並返回獲取鎖失敗
* 當 this.await返回true,進入循環嘗試獲取鎖
*/
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
// await 方法內部是用CountDownLatch來實現阻塞,獲取subscribe異步執行的結果(應用了Netty 的 Future)
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(waitTime, unit, threadId);
return false;
}
try {
// 計算獲取鎖的總耗時,如果大於等於最大等待時間,則獲取鎖失敗
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
/**
* 5.收到鎖釋放的信號后,在最大等待時間之內,循環一次接着一次的嘗試獲取鎖
* 獲取鎖成功,則立馬返回true,
* 若在最大等待時間之內還沒獲取到鎖,則認為獲取鎖失敗,返回false結束循環
*/while (true) {
long currentTime = System.currentTimeMillis();
// 再次嘗試申請鎖
ttl = tryAcquire(waitTime, leaseTime, unit, threadId);
// 成功獲取鎖則直接返回true結束循環
if (ttl == null) {
return true;
}
// 超過最大等待時間則返回false結束循環,獲取鎖失敗
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
// 6.阻塞等待鎖(通過信號量(共享鎖)阻塞,等待解鎖消息):
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
// 如果剩余時間(ttl)小於wait time ,就在 ttl 時間內,從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。 subscribeFuture.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
// 則就在wait time 時間范圍內等待可以通過信號量
subscribeFuture.getNow().getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
// 7.更新剩余的等待時間(最大等待時間-已經消耗的阻塞時間)
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(waitTime, unit, threadId);
return false;
}
}
} finally {
// 8.無論是否獲得鎖,都要取消訂閱解鎖消息
unsubscribe(subscribeFuture, threadId);
}
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
// 如果指定了失效時間,就按指定的失效時間執行,然后返回 if (leaseTime != -1) { return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG); }
// 如果沒有指定失效時間(leaseTime=-1),則默認配置30秒 (getLockWatchdogTimeOut()=30) RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime, commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); // 加鎖完畢之后,啟動看門狗線程,定時的延期失效時間(定時任務為 internalLockLeaseTime / 3 毫秒之后執行)
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) { return; } if (ttlRemaining == null) {
// 啟動看門狗任務 scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; }
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); // 通過lua腳本訪問Redis,保證操作的原子性, 以及達到批量操作的效果,提升性能
// KEYS[1] :表示分布式鎖的key
// ARGV[1] :鎖的租約時間(持有鎖的有效時間),默認30s;
// ARGV[2] :獲取鎖時set的唯一值 value,即UUID:threadId。
return evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 如果緩存中的key不存在,則設置唯一標識和超時時間,初始化value=1
// 返回空值 nil ,表示獲取鎖成功
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果key已經存在,並且value也匹配(重入情況),表示是當前線程持有的鎖,則執行 hincrby 命令,重入次數加1,並且設置失效時間
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " +
// 如果key已經存在,但是value不匹配,說明鎖已經被其他線程持有,通過 pttl 命令獲取鎖的剩余存活時間並返回,至此獲取鎖失敗 "return redis.call('pttl', KEYS[1]);",
// 這三個參數分別對應KEYS[1],ARGV[1]和ARGV[2] Collections.singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }

- unlock方法
調用關系:unlock —> unlockAsync —> unlockInnerAsync,unlockInnerAsync是解鎖的核心代碼
@Override public RFuture<Void> unlockAsync(long threadId) { RPromise<Void> result = new RedissonPromise<Void>(); RFuture<Boolean> future = unlockInnerAsync(threadId); future.onComplete((opStatus, e) -> {
// 釋放鎖后取消刷新鎖失效時間的調度任務 cancelExpirationRenewal(threadId); if (e != null) { result.tryFailure(e); return; } // 非鎖的持有者釋放鎖時拋出異常 if (opStatus == null) { IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + threadId); result.tryFailure(cause); return; } result.trySuccess(null); }); return result; } // 通過 EVAL 命令執行 Lua 腳本獲取鎖,保證了原子性
// KEYS[1] :需要加鎖的key,這里需要是字符串類型。
// KEYS[2] :redis消息的ChannelName,一個分布式鎖對應唯一的一個channelName:“redisson_lock__channel__{” + getName() + “}”
// ARGV[1] :reids消息體,這里只需要一個字節的標記就可以,主要標記redis的key已經解鎖,再結合redis的Subscribe,能喚醒其他訂閱解鎖消息的客戶端線程申請鎖。
// ARGV[2] :鎖的超時時間,防止死鎖
// ARGV[3] :鎖的唯一標識,也就是剛才介紹的 id(UUID.randomUUID()) + “:” + threadId
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果分布式鎖存在,但是value不匹配,表示鎖已經被其他線程占用,無權釋放鎖,那么直接返回空值(解鈴還須系鈴人) "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " +
// 如果value匹配,則就是當前線程占有分布式鎖,那么將重入次數減1 "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入次數減1后的值如果大於0,表示分布式鎖有重入過,那么只能更新失效時間,還不能刪除 "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " +
// 重入次數減1后的值如果為0,這時就可以刪除這個KEY,並發布解鎖消息,返回1 "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end; " + "return nil;",
// 這5個參數分別對應KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3] Arrays.asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId)); }
注意這里有個實際開發過程中,容易出現很容易出現上面第二步異常,非鎖的持有者釋放鎖時拋出異常。
解鎖消息通知:
之前加鎖的時候源碼里寫過,如果沒獲取鎖成功,就監聽這個鎖,監聽它什么時候釋放,所以解鎖的時候,要發出這個消息通知,讓其他想獲取鎖的客戶端知道。
public class LockPubSub extends PublishSubscribe<RedissonLockEntry> { public static final Long UNLOCK_MESSAGE = 0L; public static final Long READ_UNLOCK_MESSAGE = 1L; public LockPubSub(PublishSubscribeService service) { super(service); } @Override protected RedissonLockEntry createEntry(RPromise<RedissonLockEntry> newPromise) { return new RedissonLockEntry(newPromise); } @Override protected void onMessage(RedissonLockEntry value, Long message) { /** * 判斷是否是解鎖消息 */ if (message.equals(UNLOCK_MESSAGE)) { Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute != null) { runnableToExecute.run(); } /** * 釋放一個信號量,喚醒等待的entry.getLatch().tryAcquire去再次嘗試申請鎖 */ value.getLatch().release(); } else if (message.equals(READ_UNLOCK_MESSAGE)) { while (true) { /** * 如果還有其他Listeners回調,則也喚醒執行 */ Runnable runnableToExecute = value.getListeners().poll(); if (runnableToExecute == null) { break; } runnableToExecute.run(); } value.getLatch().release(value.getLatch().getQueueLength()); } } }
大體流程圖:

特點
-
互斥性。在任意時刻,只有一個客戶端能持有鎖,也叫唯一性。 -
不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。 -
防誤刪。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了,即不能誤解鎖。(業務執行時間過長,超過鎖失效時間,鎖被釋放,第二個線程獲取鎖,此時第一個線程執行到釋放鎖代碼時,不能刪除第二個線程的鎖)
-
看門狗機制。延長過期時間(沒有設置過期時間的情況,leaseTime=-1,默認失效時間為30秒,啟動看門狗線程,定時檢查是否需要延長時間scheduleExpirationRenewal)。
-
具有可用性、容錯性。只要大多數Redis節點正常運行,客戶端就能夠獲取和釋放鎖。 - 可重入性。相同線程不需要在等待鎖,而是可以直接進行相應操作
- 鎖種類多樣。可重入鎖、公平鎖、聯鎖、紅鎖、讀寫鎖
- 可阻塞等待。
存在的問題
分布式架構中的CAP理論,分布式系統只能同時滿足兩個: 一致性(Consistency)、可用性(Availability)、分區容錯性(Partition tolerance)
- Redisson分布式鎖是AP模式,當鎖存在的redis節點宕機,可能會被誤判為鎖失效,或者沒有加鎖。(Zookeeper實現的分布式鎖,是CP理論)
因為在工作中Redis都是集群部署的,所以要考慮集群節點掛掉的問題。給大家舉個例子:
- A客戶端請求主節點獲取到了鎖
- 主節點掛掉了,但是還沒把鎖的信息同步給其他從節點
- 由於主節點掛了,這時候開始主從切換,從節點成為主節點繼續工作,但是新的主節點上,沒有A客戶端的加鎖信息
- 這時候B客戶端來加鎖,因為目前是一個新的主節點,上面沒有其他客戶端加鎖信息,所以B客戶端獲取鎖成功
- 這時候就存在問題了,A和B兩個客戶端同時都持有鎖,同時在執行代碼,那么這時候分布式鎖就失效了。
這里大家會有疑問了,為啥官方給出一個分布式鎖的實現,卻不解決這個問題呢,因為發生這種情況的幾率不大,而且解決這個問題的成本有點小高。
-- 解決辦法:
- tradoff,分布式鎖的redis采用單機部署,分布式鎖專用
- RedLock: RedLock算法思想,意思是不能只在一個redis實例上創建鎖,應該是在多個redis實例上創建鎖,n / 2 + 1,必須在大多數redis節點上都成功創建鎖,才能算這個整體的RedLock加鎖成功,避免說僅僅在一個redis實例上加鎖而帶來的問題。
- 如果對鎖比較關注,一致性要求比較高,可以使用ZK實現的分布式鎖
與Zookeeper實現的分布式鎖比較
TODO
總結
- 如果考慮各種網絡、宕機等原因,很多問題需要考慮,問題會變的復雜,其實分布式鎖的應用場景不多,很多情況可以繞開分布式鎖,使用其他方式解決,比如 隊列,異步,響應式
- 個人經驗:分布式鎖的場景,更多的應用是一個操作不能同時多處進行,不能短時間內重復執行,需要冪等操作等場景,比如:防止快速的重復提交,mq與定時任務雙線更改狀態,防止消息重復消費 等等。這些情況一般使用setNx即可解決
- 所謂的減庫存其實也用不到分布式鎖
參考:
Redisson實現分布式鎖(2)—RedissonLock
