有關Redisson作為實現分布式鎖,總的分3大模塊來講。
1、Redisson實現分布式鎖原理
2、Redisson實現分布式鎖的源碼解析
3、Redisson實現分布式鎖的項目代碼(可以用於實際項目中)
本文只介紹Redisson如何實現分布式鎖的原理。其它的會在接下來的博客講,最后有關Redisson實現分布式鎖的項目代碼
的博客中會放上項目源碼到GitHub上。
一、高效分布式鎖
當我們在設計分布式鎖的時候,我們應該考慮分布式鎖至少要滿足的一些條件,同時考慮如何高效的設計分布式鎖,這里我認為以下幾點是必須要考慮的。
1、互斥
在分布式高並發的條件下,我們最需要保證,同一時刻只能有一個線程獲得鎖,這是最基本的一點。
2、防止死鎖
在分布式高並發的條件下,比如有個線程獲得鎖的同時,還沒有來得及去釋放鎖,就因為系統故障或者其它原因使它無法執行釋放鎖的命令,導致其它線程都無法獲得鎖,造成死鎖。
所以分布式非常有必要設置鎖的有效時間
,確保系統出現故障后,在一定時間內能夠主動去釋放鎖,避免造成死鎖的情況。
3、性能
對於訪問量大的共享資源,需要考慮減少鎖等待的時間,避免導致大量線程阻塞。
所以在鎖的設計時,需要考慮兩點。
1、鎖的顆粒度要盡量小
。比如你要通過鎖來減庫存,那這個鎖的名稱你可以設置成是商品的ID,而不是任取名稱。這樣這個鎖只對當前商品有效,鎖的顆粒度小。
2、鎖的范圍盡量要小
。比如只要鎖2行代碼就可以解決問題的,那就不要去鎖10行代碼了。
4、重入
我們知道ReentrantLock是可重入鎖,那它的特點就是:同一個線程可以重復拿到同一個資源的鎖。重入鎖非常有利於資源的高效利用。關於這點之后會做演示。
針對以上Redisson都能很好的滿足,下面就來分析下它。
二、Redisson原理分析
為了更好的理解分布式鎖的原理,我這邊自己畫張圖通過這張圖來分析。
1、加鎖機制
線程去獲取鎖,獲取成功: 執行lua腳本,保存數據到redis數據庫。
線程去獲取鎖,獲取失敗: 一直通過while循環嘗試獲取鎖,獲取成功后,執行lua腳本,保存數據到redis數據庫。
2、watch dog自動延期機制
這個比較難理解,找了些許資料感覺也並沒有解釋的很清楚。這里我自己的理解就是:
在一個分布式環境下,假如一個線程獲得鎖后,突然服務器宕機了,那么這個時候在一定時間后這個鎖會自動釋放,你也可以設置鎖的有效時間(不設置默認30秒),這樣的目的主要是防止死鎖的發生。
但在實際開發中會有下面一種情況:
//設置鎖1秒過去 redissonLock.lock("redisson", 1); /** * 業務邏輯需要咨詢2秒 */ redissonLock.release("redisson"); /** * 線程1 進來獲得鎖后,線程一切正常並沒有宕機,但它的業務邏輯需要執行2秒,這就會有個問題,在 線程1 執行1秒后,這個鎖就自動過期了, * 那么這個時候 線程2 進來了。那么就存在 線程1和線程2 同時在這段業務邏輯里執行代碼,這當然是不合理的。 * 而且如果是這種情況,那么在解鎖時系統會拋異常,因為解鎖和加鎖已經不是同一線程了,具體后面代碼演示。 */
所以這個時候看門狗
就出現了,它的作用就是 線程1 業務還沒有執行完,時間就過了,線程1 還想持有鎖的話,就會啟動一個watch dog后台線程,不斷的延長鎖key的生存時間。
注意
正常這個看門狗線程是不啟動的,還有就是這個看門狗啟動后對整體性能也會有一定影響,所以不建議開啟看門狗。
3、為啥要用lua腳本呢?
這個不用多說,主要是如果你的業務邏輯復雜的話,通過封裝在lua腳本中發送給redis,而且redis是單線程的,這樣就保證這段復雜業務邏輯執行的原子性。
4、可重入加鎖機制
Redisson可以實現可重入加鎖機制的原因,我覺得跟兩點有關:
1、Redis存儲鎖的數據類型是 Hash類型 2、Hash數據類型的key值包含了當前線程信息。
下面是redis存儲的數據
這里表面數據類型是Hash類型,Hash類型相當於我們java的 <key,<key1,value>>
類型,這里key是指 'redisson'
它的有效期還有9秒,我們再來看里們的key1值為078e44a3-5f95-4e24-b6aa-80684655a15a:45
它的組成是:
guid + 當前線程的ID。后面的value是就和可重入加鎖有關。
舉圖說明
上面這圖的意思就是可重入鎖的機制,它最大的優點就是相同線程不需要在等待鎖,而是可以直接進行相應操作。
5、Redis分布式鎖的缺點
Redis分布式鎖會有個缺陷,就是在Redis哨兵模式下:
客戶端1
對某個master節點
寫入了redisson鎖,此時會異步復制給對應的 slave節點。但是這個過程中一旦發生 master節點宕機,主備切換,slave節點從變為了 master節點。
這時客戶端2
來嘗試加鎖的時候,在新的master節點上也能加鎖,此時就會導致多個客戶端對同一個分布式鎖完成了加鎖。
這時系統在業務語義上一定會出現問題,導致各種臟數據的產生。
缺陷
在哨兵模式或者主從模式下,如果 master實例宕機的時候,可能導致多個客戶端同時完成加鎖。
有關Redisson實現分布式鎖上一篇博客講了分布式的鎖原理:Redisson實現分布式鎖---原理
這篇主要講RedissonLock和RLock。Redisson分布式鎖的實現是基於RLock接口,RedissonLock實現RLock接口。
一、RLock接口
1、概念
public interface RLock extends Lock, RExpirable, RLockAsync
很明顯RLock是繼承Lock鎖,所以他有Lock鎖的所有特性,比如lock、unlock、trylock等特性,同時它還有很多新特性:強制鎖釋放,帶有效期的鎖,。
2、RLock鎖API
這里針對上面做個整理,這里列舉幾個常用的接口說明
public interface RRLock { //----------------------Lock接口方法----------------------- /** * 加鎖 鎖的有效期默認30秒 */ void lock(); /** * tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他線程獲取),則返回false . */ boolean tryLock(); /** * tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,只不過區別在於這個方法在拿不到鎖時會等待一定的時間, * 在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。 * * @param time 等待時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ boolean tryLock(long time, TimeUnit unit) throws InterruptedException; /** * 解鎖 */ void unlock(); /** * 中斷鎖 表示該鎖可以被中斷 假如A和B同時調這個方法,A獲取鎖,B為獲取鎖,那么B線程可以通過 * Thread.currentThread().interrupt(); 方法真正中斷該線程 */ void lockInterruptibly(); //----------------------RLock接口方法----------------------- /** * 加鎖 上面是默認30秒這里可以手動設置鎖的有效時間 * * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ void lock(long leaseTime, TimeUnit unit); /** * 這里比上面多一個參數,多添加一個鎖的有效時間 * * @param waitTime 等待時間 * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException; /** * 檢驗該鎖是否被線程使用,如果被使用返回True */ boolean isLocked(); /** * 檢查當前線程是否獲得此鎖(這個和上面的區別就是該方法可以判斷是否當前線程獲得此鎖,而不是此鎖是否被線程占有) * 這個比上面那個實用 */ boolean isHeldByCurrentThread(); /** * 中斷鎖 和上面中斷鎖差不多,只是這里如果獲得鎖成功,添加鎖的有效時間 * @param leaseTime 鎖有效時間 * @param unit 時間單位 小時、分、秒、毫秒等 */ void lockInterruptibly(long leaseTime, TimeUnit unit); }
RLock相關接口,主要是新添加了 leaseTime
屬性字段,主要是用來設置鎖的過期時間,避免死鎖。
二、RedissonLock實現類
public class RedissonLock extends RedissonExpirable implements RLock
RedissonLock實現了RLock接口,所以實現了接口的具體方法。這里我列舉幾個方法說明下
1、void lock()方法
@Override public void lock() { try { lockInterruptibly(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } }
發現lock鎖里面進去其實用的是lockInterruptibly
(中斷鎖,表示可以被中斷),而且捕獲異常后用 Thread.currentThread().interrupt()來真正中斷當前線程,其實它們是搭配一起使用的。
具體有關lockInterruptibly()方法講解推薦一個博客。博客
:Lock的lockInterruptibly()
接下來執行流程,這里理下關鍵幾步
/** * 1、帶上默認值調另一個中斷鎖方法 */ @Override public void lockInterruptibly() throws InterruptedException { lockInterruptibly(-1, null); } /** * 2、另一個中斷鎖的方法 */ void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException /** * 3、這里已經設置了鎖的有效時間默認為30秒 (commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout()=30) */ RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); /** * 4、最后通過lua腳本訪問Redis,保證操作的原子性 */ <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); }
那么void lock(long leaseTime, TimeUnit unit)方法其實和上面很相似了,就是從上面第二步開始的。
2、tryLock(long waitTime, long leaseTime, TimeUnit unit)
接口的參數和含義上面已經說過了,現在我們開看下源碼,這里只顯示一些重要邏輯。
@Override public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); long threadId = Thread.currentThread().getId(); Long ttl = tryAcquire(leaseTime, unit, threadId); //1、 獲取鎖同時獲取成功的情況下,和lock(...)方法是一樣的 直接返回True,獲取鎖False再往下走 if (ttl == null) { return true; } //2、如果超過了嘗試獲取鎖的等待時間,當然返回false 了。 time -= System.currentTimeMillis() - current; if (time <= 0) { acquireFailed(threadId); return false; } // 3、訂閱監聽redis消息,並且創建RedissonLockEntry,其中RedissonLockEntry中比較關鍵的是一個 Semaphore屬性對象,用來控制本地的鎖請求的信號量同步,返回的是netty框架的Future實現。 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); // 阻塞等待subscribe的future的結果對象,如果subscribe方法調用超過了time,說明已經超過了客戶端設置的最大wait time,則直接返回false,取消訂閱,不再繼續申請鎖了。 // 只有await返回true,才進入循環嘗試獲取鎖 if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } acquireFailed(threadId); return false; } //4、如果沒有超過嘗試獲取鎖的等待時間,那么通過While一直獲取鎖。最終只會有兩種結果 //1)、在等待時間內獲取鎖成功 返回true。2)等待時間結束了還沒有獲取到鎖那么返回false。 while (true) { long currentTime = System.currentTimeMillis(); ttl = tryAcquire(leaseTime, unit, threadId); // 獲取鎖成功 if (ttl == null) { return true; } // 獲取鎖失敗 time -= System.currentTimeMillis() - currentTime; if (time <= 0) { acquireFailed(threadId); return false; } } }
重點
tryLock一般用於特定滿足需求的場合,但不建議作為一般需求的分布式鎖,一般分布式鎖建議用void lock(long leaseTime, TimeUnit unit)。因為從性能上考慮,在高並發情況下后者效率是前者的好幾倍
3、unlock()
解鎖的邏輯很簡單。
@Override public void unlock() { // 1.通過 Lua 腳本執行 Redis 命令釋放鎖 Boolean opStatus = commandExecutor.evalWrite(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; " + "end;" + "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " + "return nil;" + "end; " + "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " + "if (counter > 0) then " + "redis.call('pexpire', KEYS[1], ARGV[2]); " + "return 0; " + "else " + "redis.call('del', KEYS[1]); " + "redis.call('publish', KEYS[2], ARGV[1]); " + "return 1; "+ "end; " + "return nil;", Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(Thread.currentThread().getId())); // 2.非鎖的持有者釋放鎖時拋出異常 if (opStatus == null) { throw new IllegalMonitorStateException( "attempt to unlock lock, not locked by current thread by node id: " + id + " thread-id: " + Thread.currentThread().getId()); } // 3.釋放鎖后取消刷新鎖失效時間的調度任務 if (opStatus) { cancelExpirationRenewal(); } }
使用 EVAL 命令執行 Lua 腳本來釋放鎖:
- key 不存在,說明鎖已釋放,直接執行
publish
命令發布釋放鎖消息並返回1
。 - key 存在,但是 field 在 Hash 中不存在,說明自己不是鎖持有者,無權釋放鎖,返回
nil
。 - 因為鎖可重入,所以釋放鎖時不能把所有已獲取的鎖全都釋放掉,一次只能釋放一把鎖,因此執行
hincrby
對鎖的值減一。 - 釋放一把鎖后,如果還有剩余的鎖,則刷新鎖的失效時間並返回
0
;如果剛才釋放的已經是最后一把鎖,則執行del
命令刪除鎖的 key,並發布鎖釋放消息,返回1
。
注意
這里有個實際開發過程中,容易出現很容易出現上面第二步異常,非鎖的持有者釋放鎖時拋出異常。比如下面這種情況
//設置鎖1秒過去 redissonLock.lock("redisson", 1); /** * 業務邏輯需要咨詢2秒 */ redissonLock.release("redisson"); /** * 線程1 進來獲得鎖后,線程一切正常並沒有宕機,但它的業務邏輯需要執行2秒,這就會有個問題,在 線程1 執行1秒后,這個鎖就自動過期了, * 那么這個時候 線程2 進來了。在線程1去解鎖就會拋上面這個異常(因為解鎖和當前鎖已經不是同一線程了) */
有關Redisson實現分布式鎖前面寫了兩篇博客作為該項目落地的鋪墊。
2、Redisson實現分布式鎖(2)—RedissonLock
這篇講下通過Redisson實現分布式鎖的項目實現,我會把項目放到GitHub,該項目可以直接運用於實際開發中,作為分布式鎖使用。
一、項目概述
1、技術架構
項目總體技術選型
SpringBoot2.1.5 + Maven3.5.4 + Redisson3.5.4 + lombok(插件)
2、加鎖方式
該項目支持 自定義注解加鎖
和 常規加鎖
兩種模式
自定義注解加鎖
@DistributedLock(value="goods", leaseTime=5) public String lockDecreaseStock(){ //業務邏輯 }
常規加鎖
//1、加鎖 redissonLock.lock("redisson", 10); //2、業務邏輯 //3、解鎖 redissonLock.unlock("redisson");
3、Redis部署方式
該項目支持四種Redis部署方式
1、單機模式部署
2、集群模式部署
3、主從模式部署
4、哨兵模式部署
該項目已經實現支持上面四種模式,你要采用哪種只需要修改配置文件application.properties
,項目代碼不需要做任何修改。
4、項目整體結構
redis-distributed-lock-core # 核心實現 | ---src | ---com.jincou.redisson |# 通過注解方式 實現分布式鎖 ---annotation |# 配置類實例化RedissonLock ---config |# 放置常量信息 ---constant |# 讀取application.properties信息后,封裝到實體 ---entity |# 支持單機、集群、主從、哨兵 代碼實現 ---strategy redis-distributed-lock-web-test # 針對上面實現類的測試類 | ---src | ---java | ---com.jincou.controller |# 測試 基於注解方式實現分布式鎖 ---AnnotatinLockController.java |# 測試 基於常規方式實現分布式鎖 ---LockController.java ---resources | # 配置端口號 連接redis信息(如果確定部署類型,那么將連接信息放到core項目中) ---application.properties
二、測試
模擬1秒內100個線程
請求接口,來測試結果是否正確。同時測試3中不同的鎖:lock鎖、trylock鎖、注解鎖。
1、lock鎖
/** * 模擬這個是商品庫存 */ public static volatile Integer TOTAL = 10; @GetMapping("lock-decrease-stock") public String lockDecreaseStock() throws InterruptedException { redissonLock.lock("lock", 10); if (TOTAL > 0) { TOTAL--; } Thread.sleep(50); log.info("======減完庫存后,當前庫存===" + TOTAL); //如果該線程還持有該鎖,那么釋放該鎖。如果該線程不持有該鎖,說明該線程的鎖已到過期時間,自動釋放鎖 if (redissonLock.isHeldByCurrentThread("lock")) { redissonLock.unlock("lock"); } return "================================="; }
壓測結果
沒問題,不會超賣!
2、tryLock鎖
/** * 模擬這個是商品庫存 */ public static volatile Integer TOTAL = 10; @GetMapping("trylock-decrease-stock") public String trylockDecreaseStock() throws InterruptedException { if (redissonLock.tryLock("trylock", 10, 5)) { if (TOTAL > 0) { TOTAL--; } Thread.sleep(50); redissonLock.unlock("trylock"); log.info("====tryLock===減完庫存后,當前庫存===" + TOTAL); } else { log.info("[ExecutorRedisson]獲取鎖失敗"); } return "==================================="; }
測試結果
沒有問題 ,不會超賣!
3、注解鎖
/** * 模擬這個是商品庫存 */ public static volatile Integer TOTAL = 10; @GetMapping("annotatin-lock-decrease-stock") @DistributedLock(value="goods", leaseTime=5) public String lockDecreaseStock() throws InterruptedException { if (TOTAL > 0) { TOTAL--; } log.info("===注解模式=== 減完庫存后,當前庫存===" + TOTAL); return "================================="; }
測試結果
沒有問題 ,不會超賣!
通過實驗可以看出,通過這三種模式都可以實現分布式鎖,然后呢?哪個最優。
三、三種鎖的鎖選擇
觀點
最完美的就是lock鎖,因為
1、tryLock鎖是可能會跳過減庫存的操作,因為當過了等待時間還沒有獲取鎖,就會返回false,這顯然很致命! 2、注解鎖只能用於方法上,顆粒度太大,滿足不了方法內加鎖。
1、lock PK tryLock 性能的比較
模擬5秒內1000個線程
分別去壓測這兩個接口,看報告結果!
1)lock鎖
壓測結果 1000個線程平均響應時間為31324。吞吐量 14.7/sec
2)tryLock鎖
壓測結果 1000個線程平均響應時間為28628。吞吐量 16.1/sec
這里只是單次測試,有很大的隨機性。從當前環境單次測試來看,tryLock稍微高點。
2、常見異常 attempt to unlock lock, not ······
在使用RedissonLock鎖時,很容易報這類異常,比如如下操作
//設置鎖1秒過去 redissonLock.lock("redisson", 1); /** * 業務邏輯需要咨詢2秒 */ redissonLock.release("redisson");
上面在並發情況下就會這樣
造成異常原因:
線程1 進來獲得鎖后,但它的業務邏輯需要執行2秒,在 線程1 執行1秒后,這個鎖就自動過期了,那么這個時候
線程2 進來了獲得了鎖。在線程1去解鎖就會拋上面這個異常(因為解鎖和當前鎖已經不是同一線程了)
所以我們需要注意,設置鎖的過期時間不能設置太小,一定要合理,寧願設置大點。
正對上面的異常,可以通過isHeldByCurrentThread()方法,
//如果為false就說明該線程的鎖已經自動釋放,無需解鎖 if (redissonLock.isHeldByCurrentThread("lock")) { redissonLock.unlock("lock"); }
好了,這篇博客就到這了!
至於完整的項目地址見GitHub。
如果對您能有幫助,就給個星星吧,哈哈!
GitHub地址
https://github.com/yudiandemingzi/spring-boot-distributed-redisson