Redis分布式鎖-這一篇全了解(Redission實現分布式鎖完美方案)


前言

在某些場景中,多個進程必須以互斥的方式獨占共享資源,這時用分布式鎖是最直接有效的。

隨着技術快速發展,數據規模增大,分布式系統越來越普及,一個應用往往會部署在多台機器上(多節點),在有些場景中,為了保證數據不重復,要求在同一時刻,同一任務只在一個節點上運行,即保證某一方法同一時刻只能被一個線程執行。在單機環境中,應用是在同一進程下的,只需要保證單進程多線程環境中的線程安全性,通過 JAVA 提供的 volatile、ReentrantLock、synchronized 以及 concurrent 並發包下一些線程安全的類等就可以做到。而在多機部署環境中,不同機器不同進程,就需要在多進程下保證線程的安全性了。因此,分布式鎖應運而生。

以往的工作中看到或用到幾種實現方案,有基於zk的,也有基於redis的。由於實現上邏輯不嚴謹,線上時不時會爆出幾個死鎖case。那么,究竟什么樣的分布式鎖實現,才算是比較好的方案?

常見分布式鎖方案對比

分類 方案 實現原理 優點 缺點
基於數據庫 基於mysql 表唯一索引 1.表增加唯一索引
2.加鎖:執行insert語句,若報錯,則表明加鎖失敗
3.解鎖:執行delete語句
完全利用DB現有能力,實現簡單 1.鎖無超時自動失效機制,有死鎖風險
2.不支持鎖重入,不支持阻塞等待
3.操作數據庫開銷大,性能不高
基於MongoDB findAndModify原子操作 1.加鎖:執行findAndModify原子命令查找document,若不存在則新增
2.解鎖:刪除document
實現也很容易,較基於MySQL唯一索引的方案,性能要好很多 1.大部分公司數據庫用MySQL,可能缺乏相應的MongoDB運維、開發人員
2.鎖無超時自動失效機制
基於分布式協調系統 基於ZooKeeper 1.加鎖:在/lock目錄下創建臨時有序節點,判斷創建的節點序號是否最小。若是,則表示獲取到鎖;否,則則watch /lock目錄下序號比自身小的前一個節點
2.解鎖:刪除節點
1.由zk保障系統高可用
2.Curator框架已原生支持系列分布式鎖命令,使用簡單
需單獨維護一套zk集群,維保成本高
基於緩存 基於redis命令 1. 加鎖:執行setnx,若成功再執行expire添加過期時間
2. 解鎖:執行delete命令
實現簡單,相比數據庫和分布式系統的實現,該方案最輕,性能最好 1.setnx和expire分2步執行,非原子操作;若setnx執行成功,但expire執行失敗,就可能出現死鎖
2.delete命令存在誤刪除非當前線程持有的鎖的可能
3.不支持阻塞等待、不可重入
基於redis Lua腳本能力 1. 加鎖:執行SET lock_name random_value EX seconds NX 命令

2. 解鎖:執行Lua腳本,釋放鎖時驗證random_value 
-- ARGV[1]為random_value,  KEYS[1]為lock_name

if redis.call("get", KEYS[1]) == ARGV[1] then

    return redis.call("del",KEYS[1])

else

    return 0

end

同上;實現邏輯上也更嚴謹,除了單點問題,生產環境采用用這種方案,問題也不大。 不支持鎖重入,不支持阻塞等待

 

表格中對比了幾種常見的方案,redis+lua基本可應付工作中分布式鎖的需求。然而,當偶然看到redisson分布式鎖實現方案(傳送門),相比以上方案,redisson保持了簡單易用、支持鎖重入、支持阻塞等待、Lua腳本原子操作,不禁佩服作者精巧的構思和高超的編碼能力。下面就來學習下redisson這個牛逼框架,是怎么實現的。

分布式鎖需滿足四個條件

首先,為了確保分布式鎖可用,我們至少要確保鎖的實現同時滿足以下四個條件:

  1. 互斥性。在任意時刻,只有一個客戶端能持有鎖。
  2. 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。
  3. 解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了,即不能誤解鎖。
  4. 具有容錯性。只要大多數Redis節點正常運行,客戶端就能夠獲取和釋放鎖。

Redisson分布式鎖的實現

Redisson 分布式重入鎖用法

Redisson 支持單點模式、主從模式、哨兵模式、集群模式,這里以單點模式為例:


    
    
   
   
           
  1. // 1.構造redisson實現分布式鎖必要的Config
  2. Config config = new Config();
  3. config.useSingleServer().setAddress( "redis://127.0.0.1:5379").setPassword( "123456").setDatabase( 0);
  4. // 2.構造RedissonClient
  5. RedissonClient redissonClient = Redisson.create(config);
  6. // 3.獲取鎖對象實例(無法保證是按線程的順序獲取到)
  7. RLock rLock = redissonClient.getLock(lockKey);
  8. try {
  9. /**
  10. * 4.嘗試獲取鎖
  11. * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗
  12. * leaseTime 鎖的持有時間,超過這個時間鎖會自動失效(值應設置為大於業務處理的時間,確保在鎖有效期內業務能處理完)
  13. */
  14. boolean res = rLock.tryLock(( long)waitTimeout, ( long)leaseTime, TimeUnit.SECONDS);
  15. if (res) {
  16. //成功獲得鎖,在這里處理業務
  17. }
  18. } catch (Exception e) {
  19. throw new RuntimeException( "aquire lock fail");
  20. } finally{
  21. //無論如何, 最后都要解鎖
  22. rLock.unlock();
  23. }

redisson這個框架重度依賴了Lua腳本和Netty,代碼很牛逼,各種Future及FutureListener的異步、同步操作轉換。

自己先思考下,如果要手寫一個分布式鎖組件,怎么做?肯定要定義2個接口:加鎖、解鎖;大道至簡,redisson的作者就是在加鎖和解鎖的執行層面采用Lua腳本,逼格高,而且重要有原子性保證啊。當然,redisson的作者畢竟牛逼,加鎖和解鎖過程中還巧妙地利用了redis的發布訂閱功能,后面會講到。下面先對加鎖和解鎖Lua腳本了解下。

加鎖&解鎖Lua腳本

加鎖、解鎖Lua腳本是redisson分布式鎖實現最重要的組成部分。首先不看代碼,先研究下Lua腳本都是什么邏輯

1、加鎖Lua腳本

  • 腳本入參
參數 示例值 含義
KEY個數 1 KEY個數
KEYS[1] my_first_lock_name 鎖名
ARGV[1] 60000 持有鎖的有效時間:毫秒
ARGV[2] 58c62432-bb74-4d14-8a00-9908cc8b828f:1 唯一標識:獲取鎖時set的唯一值,實現上為redisson客戶端ID(UUID)+線程ID
  • 腳本內容

    
    
   
   
           
  1. -- 若鎖不存在:則新增鎖,並設置鎖重入計數為 1、設置鎖過期時間
  2. if (redis.call( 'exists', KEYS[ 1]) == 0) then
  3. redis.call( 'hset', KEYS[ 1], ARGV[ 2], 1);
  4. redis.call( 'pexpire', KEYS[ 1], ARGV[ 1]);
  5. return nil;
  6. end;
  7. -- 若鎖存在,且唯一標識也匹配:則表明當前加鎖請求為鎖重入請求,故鎖重入計數+ 1,並再次設置鎖過期時間
  8. if (redis.call( 'hexists', KEYS[ 1], ARGV[ 2]) == 1) then
  9. redis.call( 'hincrby', KEYS[ 1], ARGV[ 2], 1);
  10. redis.call( 'pexpire', KEYS[ 1], ARGV[ 1]);
  11. return nil;
  12. end;
  13. -- 若鎖存在,但唯一標識不匹配:表明鎖是被其他線程占用,當前線程無權解他人的鎖,直接返回鎖剩余過期時間
  14. return redis.call( 'pttl', KEYS[ 1]);
  • 腳本解讀

Q:返回nil、返回剩余過期時間有什么目的? 
A:當且僅當返回nil,才表示加鎖成功;客戶端需要感知加鎖是否成功的結果

2、解鎖Lua腳本

  • 腳本入參
參數 示例值 含義
KEY個數 2 KEY個數
KEYS[1] my_first_lock_name 鎖名
KEYS[2] redisson_lock__channel:{my_first_lock_name} 解鎖消息PubSub頻道
ARGV[1] 0 redisson定義0表示解鎖消息
ARGV[2] 30000 設置鎖的過期時間;默認值30秒
ARGV[3] 58c62432-bb74-4d14-8a00-9908cc8b828f:1 唯一標識;同加鎖流程
  • 腳本內容

    
    
   
   
           
  1. -- 若鎖不存在:則直接廣播解鎖消息,並返回 1
  2. if (redis.call( 'exists', KEYS[ 1]) == 0) then
  3. redis.call( 'publish', KEYS[ 2], ARGV[ 1]);
  4. return 1;
  5. end;
  6. -- 若鎖存在,但唯一標識不匹配:則表明鎖被其他線程占用,當前線程不允許解鎖其他線程持有的鎖
  7. if (redis.call( 'hexists', KEYS[ 1], ARGV[ 3]) == 0) then
  8. return nil;
  9. end;
  10. -- 若鎖存在,且唯一標識匹配:則先將鎖重入計數減 1
  11. local counter = redis.call( 'hincrby', KEYS[ 1], ARGV[ 3], - 1);
  12. if (counter > 0) then
  13. -- 鎖重入計數減 1后還大於 0:表明當前線程持有的鎖還有重入,不能進行鎖刪除操作,但可以友好地幫忙設置下過期時期
  14. redis.call( 'pexpire', KEYS[ 1], ARGV[ 2]);
  15. return 0;
  16. else
  17. -- 鎖重入計數已為 0:間接表明鎖已釋放了。直接刪除掉鎖,並廣播解鎖消息,去喚醒那些爭搶過鎖但還處於阻塞中的線程
  18. redis.call( 'del', KEYS[ 1]);
  19. redis.call( 'publish', KEYS[ 2], ARGV[ 1]);
  20. return 1;
  21. end;
  22. return nil;

 

  • 腳本解讀

 

Q1:廣播解鎖消息有什么用? 
A:是為了通知其他爭搶鎖阻塞住的線程,從阻塞中解除,並再次去爭搶鎖。

Q2:返回值0、1、nil有什么不一樣? 
A:當且僅當返回1,才表示當前請求真正觸發了解鎖Lua腳本;但客戶端又並不關心解鎖請求的返回值,好像沒什么用?

源碼搞起

1、加鎖流程源碼

讀加鎖源碼時,可以把tryAcquire(leaseTime, unit, threadId)方法直接視為執行加鎖Lua腳本。直接進入org.redisson.RedissonLock#tryLock(long, long, java.util.concurrent.TimeUnit)源碼


    
    
   
   
           
  1. @Override
  2. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  3. // 獲取鎖能容忍的最大等待時長
  4. long time = unit.toMillis(waitTime);
  5. long current = System.currentTimeMillis();
  6. final long threadId = Thread.currentThread().getId();
  7. // 【核心點1】嘗試獲取鎖,若返回值為null,則表示已獲取到鎖
  8. Long ttl = tryAcquire(leaseTime, unit, threadId);
  9. // lock acquired
  10. if (ttl == null) {
  11. return true;
  12. }
  13. // 還可以容忍的等待時長=獲取鎖能容忍的最大等待時長 - 執行完上述操作流逝的時間
  14. time -= (System.currentTimeMillis() - current);
  15. if (time <= 0) {
  16. acquireFailed(threadId);
  17. return false;
  18. }
  19. current = System.currentTimeMillis();
  20. // 【核心點2】訂閱解鎖消息,見org.redisson.pubsub.LockPubSub#onMessage
  21. /**
  22. * 4.訂閱鎖釋放事件,並通過await方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題:
  23. * 基於信息量,當鎖被其它資源占用時,當前線程通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會發消息通知待等待的線程進行競爭
  24. * 當 this.await返回false,說明等待時間已經超出獲取鎖最大等待時間,取消訂閱並返回獲取鎖失敗
  25. * 當 this.await返回true,進入循環嘗試獲取鎖
  26. */
  27. final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
  28. //await 方法內部是用CountDownLatch來實現阻塞,獲取subscribe異步執行的結果(應用了Netty 的 Future)
  29. if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
  30. if (!subscribeFuture.cancel( false)) {
  31. subscribeFuture.addListener( new FutureListener<RedissonLockEntry>() {
  32. @Override
  33. public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
  34. if (subscribeFuture.isSuccess()) {
  35. unsubscribe(subscribeFuture, threadId);
  36. }
  37. }
  38. });
  39. }
  40. acquireFailed(threadId);
  41. return false;
  42. }
  43. // 訂閱成功
  44. try {
  45. // 還可以容忍的等待時長=獲取鎖能容忍的最大等待時長 - 執行完上述操作流逝的時間
  46. time -= (System.currentTimeMillis() - current);
  47. if (time <= 0) {
  48. // 超出可容忍的等待時長,直接返回獲取鎖失敗
  49. acquireFailed(threadId);
  50. return false;
  51. }
  52. while ( true) {
  53. long currentTime = System.currentTimeMillis();
  54. // 嘗試獲取鎖;如果鎖被其他線程占用,就返回鎖剩余過期時間【同上】
  55. ttl = tryAcquire(leaseTime, unit, threadId);
  56. // lock acquired
  57. if (ttl == null) {
  58. return true;
  59. }
  60. time -= (System.currentTimeMillis() - currentTime);
  61. if (time <= 0) {
  62. acquireFailed(threadId);
  63. return false;
  64. }
  65. // waiting for message
  66. currentTime = System.currentTimeMillis();
  67. // 【核心點3】根據鎖TTL,調整阻塞等待時長;
  68. // 注意:這里實現非常巧妙,1、latch其實是個信號量Semaphore,調用其tryAcquire方法會讓當前線程阻塞一段時間,避免了在while循環中頻繁請求獲取鎖;
  69. //2、該Semaphore的release方法,會在訂閱解鎖消息的監聽器消息處理方法org.redisson.pubsub.LockPubSub#onMessage調用;當其他線程釋放了占用的鎖,會廣播解鎖消息,監聽器接收解鎖消息,並釋放信號量,最終會喚醒阻塞在這里的線程。
  70. if (ttl >= 0 && ttl < time) {
  71. getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
  72. } else {
  73. getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
  74. }
  75. time -= (System.currentTimeMillis() - currentTime);
  76. if (time <= 0) {
  77. acquireFailed(threadId);
  78. return false;
  79. }
  80. }
  81. } finally {
  82. // 取消解鎖消息的訂閱
  83. unsubscribe(subscribeFuture, threadId);
  84. }
  85. }

接下的再獲取鎖方法 tryAcquire的實現,真的就是執行Lua腳本!


    
    
   
   
           
  1. private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
  2. // tryAcquireAsync異步執行Lua腳本,get方法同步獲取返回結果
  3. return get(tryAcquireAsync(leaseTime, unit, threadId));
  4. }
  5. // 見org.redisson.RedissonLock#tryAcquireAsync
  6. private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
  7. if (leaseTime != - 1) {
  8. // 實質是異步執行加鎖Lua腳本
  9. return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
  10. }
  11. RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
  12. ttlRemainingFuture.addListener( new FutureListener<Long>() {
  13. @Override
  14. public void operationComplete(Future<Long> future) throws Exception {
  15. //先判斷這個異步操作有沒有執行成功,如果沒有成功,直接返回,如果執行成功了,就會同步獲取結果
  16. if (!future.isSuccess()) {
  17. return;
  18. }
  19. Long ttlRemaining = future.getNow();
  20. // lock acquired
  21. //如果ttlRemaining為null,則會執行一個定時調度的方法scheduleExpirationRenewal
  22. if (ttlRemaining == null) {
  23. scheduleExpirationRenewal(threadId);
  24. }
  25. }
  26. });
  27. return ttlRemainingFuture;
  28. }
  29. // 見org.redisson.RedissonLock#tryLockInnerAsync
  30. <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  31. internalLockLeaseTime = unit.toMillis(leaseTime);
  32. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
  33. "if (redis.call('exists', KEYS[1]) == 0) then " +
  34. "redis.call('hset', KEYS[1], ARGV[2], 1); " +
  35. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  36. "return nil; " +
  37. "end; " +
  38. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  39. "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
  40. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  41. "return nil; " +
  42. "end; " +
  43. "return redis.call('pttl', KEYS[1]);",
  44. Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
  45. }

加鎖過程小結

1、鎖其實也是一種資源,各線程爭搶鎖操作對應到redisson中就是爭搶着去創建一個hash結構,誰先創建就代表誰獲得鎖;hash的名稱為鎖名,hash里面內容僅包含一條鍵值對,鍵為redisson客戶端唯一標識+持有鎖線程id,值為鎖重入計數;給hash設置的過期時間就是鎖的過期時間。放個圖直觀感受下:

2、加鎖流程核心就3步 
Step1:嘗試獲取鎖,這一步是通過執行加鎖Lua腳本來做; 
Step2:若第一步未獲取到鎖,則去訂閱解鎖消息,當獲取鎖到剩余過期時間后,調用信號量方法阻塞住,直到被喚醒或等待超時 
Step3:一旦持有鎖的線程釋放了鎖,就會廣播解鎖消息。於是,第二步中的解鎖消息的監聽器會釋放信號量,獲取鎖被阻塞的那些線程就會被喚醒,並重新嘗試獲取鎖。

比如 RedissonLock中的變量internalLockLeaseTime,默認值是30000毫秒,還有調用tryLockInnerAsync()傳入的一個從連接管理器獲取的getLockWatchdogTimeout(),他的默認值也是30000毫秒,這些都和redisson官方文檔所說的watchdog機制有關,看門狗,還是很形象的描述這一機制,那么看門狗到底做了什么,為什么這么做,來看下核心代碼.

先思考一個問題,假設在一個分布式環境下,多個服務實例請求獲取鎖,其中服務實例1成功獲取到了鎖,在執行業務邏輯的過程中,服務實例突然掛掉了或者hang住了,那么這個鎖會不會釋放,什么時候釋放?回答這個問題,自然想起來之前我們分析的lua腳本,其中第一次加鎖的時候使用pexpire給鎖key設置了過期時間,默認30000毫秒,由此來看如果服務實例宕機了,鎖最終也會釋放,其他服務實例也是可以繼續獲取到鎖執行業務。但是要是30000毫秒之后呢,要是服務實例1沒有宕機但是業務執行還沒有結束,所釋放掉了就會導致線程問題,這個redisson是怎么解決的呢?這個就一定要實現自動延長鎖有效期的機制。
異步執行完lua腳本執行完成之后,設置了一個監聽器,來處理異步執行結束之后的一些工作。在操作完成之后會去執行operationComplete方法,先判斷這個異步操作有沒有執行成功,如果沒有成功,直接返回,如果執行成功了,就會同步獲取結果,如果ttlRemaining為null,則會執行一個定時調度的方法scheduleExpirationRenewal,回想一下之前的lua腳本,當加鎖邏輯
處理結束,返回了一個nil;如此說來 就一定會走定時任務了。來看下定時調度scheduleExpirationRenewal代碼


    
    
   
   
           
  1. private void scheduleExpirationRenewal(final long threadId) {
  2. if (expirationRenewalMap.containsKey(getEntryName())) {
  3. return;
  4. }
  5. Timeout task = commandExecutor.getConnectionManager().newTimeout( new TimerTask() {
  6. @Override
  7. public void run(Timeout timeout) throws Exception {
  8. RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  9. "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
  10. "redis.call('pexpire', KEYS[1], ARGV[1]); " +
  11. "return 1; " +
  12. "end; " +
  13. "return 0;",
  14. Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
  15. future.addListener( new FutureListener<Boolean>() {
  16. @Override
  17. public void operationComplete(Future<Boolean> future) throws Exception {
  18. expirationRenewalMap.remove(getEntryName());
  19. if (!future.isSuccess()) {
  20. log.error( "Can't update lock " + getName() + " expiration", future.cause());
  21. return;
  22. }
  23. if (future.getNow()) {
  24. // reschedule itself
  25. scheduleExpirationRenewal(threadId);
  26. }
  27. }
  28. });
  29. }
  30. }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
  31. if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
  32. task.cancel();
  33. }
  34. }

首先,會先判斷在expirationRenewalMap中是否存在了entryName,這是個map結構,主要還是判斷在這個服務實例中的加鎖客戶端的鎖key是否存在,如果已經存在了,就直接返回;第一次加鎖,肯定是不存在的,接下來就是搞了一個TimeTask,延遲internalLockLeaseTime/3之后執行,這里就用到了文章一開始就提到奇妙的變量,算下來就是大約10秒鍾執行一次,調用了一個異步執行的方法

如圖也是調用異步執行了一段lua腳本,首先判斷這個鎖key的map結構中是否存在對應的key8a9649f5-f5b5-48b4-beaa-d0c24855f9ab:anyLock:1,如果存在,就直接調用pexpire命令設置鎖key的過期時間,默認30000毫秒。

OK,現在思路就清晰了,在上面任務調度的方法中,也是異步執行並且設置了一個監聽器,在操作執行成功之后,會回調這個方法,如果調用失敗會打一個錯誤日志並返回,更新鎖過期時間失敗;然后獲取異步執行的結果,如果為true,就會調用本身,如此說來又會延遲10秒鍾去執行這段邏輯,所以,這段邏輯在你成功獲取到鎖之后,會每隔十秒鍾去執行一次,並且,在鎖key還沒有失效的情況下,會把鎖的過期時間繼續延長到30000毫秒,也就是說只要這台服務實例沒有掛掉,並且沒有主動釋放鎖,看門狗都會每隔十秒給你續約一下,保證鎖一直在你手中。完美的操作。
到現在來說,加鎖,鎖自動延長過期時間,都OK了,然后就是說在你執行業務,持有鎖的這段時間,別的服務實例來嘗試加鎖又會發生什么情況呢?或者當前客戶端的別的線程來獲取鎖呢?很顯然,肯定會阻塞住,我們來通過代碼看看是怎么做到的。還是把眼光放到之前分析的那段加鎖lua代碼上,當加鎖的鎖key存在的時候並且鎖key對應的map結構中當前客戶端的唯一key也存在時,會去調用hincrby命令,將唯一key的值自增一,並且會pexpire設置key的過期時間為30000毫秒,然后返回nil,可以想象這里也是加鎖成功的,也會繼續去執行定時調度任務,完成鎖key過期時間的續約,這里呢,就實現了鎖的可重入性。
那么當以上這種情況也沒有發生呢,這里就會直接返回當前鎖的剩余有效期,相應的也不會去執行續約邏輯。此時一直返回到上面的方法,如果加鎖成功就直接返回;否則就會進入一個死循環,去嘗試加鎖,並且也會在等待一段時間之后一直循環嘗試加鎖,阻塞住,知道第一個服務實例釋放鎖。對於不同的服務實例嘗試會獲取一把鎖,也和上面的邏輯類似,都是這樣實現了鎖的互斥。

緊接着,我們來看看鎖釋放的邏輯,其實也很簡單,調用了lock.unlock()方法,跟着代碼走流程發現,也是異步調用了一段lua腳本,lua腳本,應該就比較清晰,也就是通過判斷鎖key是否存在,如果不存在直接返回;否則就會判斷當前客戶端對應的唯一key的值是否存在,如果不存在就會返回nil;否則,值自增-1,判斷唯一key的值是否大於零,如果大於零,則返回0;否則刪除當前鎖key,並返回1;返回到上一層方法,也是針對返回值進行了操作,如果返回值是1,則會去取消之前的定時續約任務,如果失敗了,則會做一些類似設置狀態的操作,這一些和解鎖邏輯也沒有什么關系,可以不去看他。

解鎖流程源碼

解鎖流程相對比較簡單,完全就是執行解鎖Lua腳本,無額外的代碼邏輯,直接看org.redisson.RedissonLock#unlock代碼


    
    
   
   
           
  1. @Override
  2. public void unlock() {
  3. // 執行解鎖Lua腳本,這里傳入線程id,是為了保證加鎖和解鎖是同一個線程,避免誤解鎖其他線程占有的鎖
  4. Boolean opStatus = get(unlockInnerAsync(Thread.currentThread().getId()));
  5. if (opStatus == null) {
  6. throw new IllegalMonitorStateException( "attempt to unlock lock, not locked by current thread by node id: "
  7. + id + " thread-id: " + Thread.currentThread().getId());
  8. }
  9. if (opStatus) {
  10. cancelExpirationRenewal();
  11. }
  12. }
  13. // 見org.redisson.RedissonLock#unlockInnerAsync
  14. protected RFuture<Boolean> unlockInnerAsync(long threadId) {
  15. return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
  16. "if (redis.call('exists', KEYS[1]) == 0) then " +
  17. "redis.call('publish', KEYS[2], ARGV[1]); " +
  18. "return 1; " +
  19. "end;" +
  20. "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
  21. "return nil;" +
  22. "end; " +
  23. "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
  24. "if (counter > 0) then " +
  25. "redis.call('pexpire', KEYS[1], ARGV[2]); " +
  26. "return 0; " +
  27. "else " +
  28. "redis.call('del', KEYS[1]); " +
  29. "redis.call('publish', KEYS[2], ARGV[1]); " +
  30. "return 1; "+
  31. "end; " +
  32. "return nil;",
  33. Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
  34. }

 

c.加鎖&解鎖流程串起來

上面結合Lua腳本和源碼,分別分析了加鎖流程和解鎖流程。下面升級下挑戰難度,模擬下多個線程爭搶鎖會是怎樣的流程。示意圖如下,比較關鍵的三處已用紅色字體標注。

 

概括下整個流程

1、線程A和線程B兩個線程同時爭搶鎖。線程A很幸運,最先搶到了鎖。線程B在獲取鎖失敗后,並未放棄希望,而是主動訂閱了解鎖消息,然后再嘗試獲取鎖,順便看看沒有搶到的這把鎖還有多久就過期,線程B就按需阻塞等鎖釋放。

2、線程A拿着鎖干完了活,自覺釋放了持有的鎖,於此同時廣播了解鎖消息,通知其他搶鎖的線程再來槍;

3、解鎖消息的監聽者LockPubSub收到消息后,釋放自己持有的信號量;線程B就瞬間從阻塞中被喚醒了,接着再搶鎖,這次終於搶到鎖了!后面再按部就班,干完活,解鎖

其他料

Q1:訂閱頻道名稱(如:redisson_lock__channel:{my_first_lock_name})為什么有大括號? 
A: 
1.在redis集群方案中,如果Lua腳本涉及多個key的操作,則需限制這些key在同一個slot中,才能保障Lua腳本執行的原子性。否則運行會報錯Lua script attempted to access a non local key in a cluster node . channel; 
2.HashTag是用{}包裹key的一個子串,若設置了HashTag,集群會根據HashTag決定key分配到哪個slot;HashTag不支持嵌套,只有第一個左括號{和第一個右括號}里面的內容才當做HashTag參與slot計算;通常,客戶端都會封裝這個計算邏輯。


    
    
   
   
           
  1. // 見org.redisson.cluster.ClusterConnectionManager#calcSlot
  2. @Override
  3. public int calcSlot(String key) {
  4. if (key == null) {
  5. return 0;
  6. }
  7. int start = key.indexOf( '{');
  8. if (start != - 1) {
  9. int end = key.indexOf( '}');
  10. key = key.substring(start+ 1, end);
  11. }
  12. int result = CRC16.crc16(key.getBytes()) % MAX_SLOT;
  13. log.debug( "slot {} for {}", result, key);
  14. return result;
  15. }

3.在解鎖Lua腳本中,操作了兩個key:一個是鎖名my_lock_name,一個是解鎖消息發布訂閱頻道redisson_lock__channel:{my_first_lock_name},按照上面slot計算方式,兩個key都會按照內容my_first_lock_name來計算,故能保證落到同一個slot

Q2:redisson代碼幾乎都是以Lua腳本方式與redis服務端交互,如何跟蹤這些腳本執行過程? 
A:啟動一個redis客戶端終端,執行monitor命令以便在終端上實時打印 redis 服務器接收到的命令;然后debug執行redisson加鎖/解鎖測試用例,即可看到代碼運行過程中實際執行了哪些Lua腳本

eg:上面整體流程示意圖的測試用例位:


    
    
   
   
           
  1. @RunWith(SpringRunner.class)
  2. @SpringBootTest
  3. public class RedissonDistributedLockerTest {
  4. private static final Logger log = LoggerFactory.getLogger(RedissonDistributedLocker.class);
  5. @Resource
  6. private DistributedLocker distributedLocker;
  7. private static final ExecutorService executorServiceB = Executors.newSingleThreadExecutor();
  8. private static final ExecutorService executorServiceC = Executors.newSingleThreadExecutor();
  9. @Test
  10. public void tryLockUnlockCost() throws Exception {
  11. StopWatch stopWatch = new StopWatch( "加鎖解鎖耗時統計");
  12. stopWatch.start();
  13. for ( int i = 0; i < 10000; i++) {
  14. String key = "mock-key:" + UUID.randomUUID().toString().replace( "-", "");
  15. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
  16. Assert.assertTrue(optLocked.isPresent());
  17. optLocked.get().unlock();
  18. }
  19. stopWatch.stop();
  20. log.info(stopWatch.prettyPrint());
  21. }
  22. @Test
  23. public void tryLock() throws Exception {
  24. String key = "mock-key:" + UUID.randomUUID().toString().replace( "-", "");
  25. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
  26. Assert.assertTrue(optLocked.isPresent());
  27. Optional<LockResource> optLocked2 = distributedLocker.tryLock(key, 600000, 600000);
  28. Assert.assertTrue(optLocked2.isPresent());
  29. optLocked.get().unlock();
  30. }
  31. /**
  32. * 模擬2個線程爭搶鎖:A先獲取到鎖,A釋放鎖后,B再獲得鎖
  33. */
  34. @Test
  35. public void tryLock2() throws Exception {
  36. String key = "mock-key:" + UUID.randomUUID().toString().replace( "-", "");
  37. CountDownLatch countDownLatch = new CountDownLatch( 1);
  38. Future<Optional<LockResource>> submit = executorServiceB.submit(() -> {
  39. countDownLatch.await();
  40. log.info( "B嘗試獲得鎖:thread={}", currentThreadId());
  41. return distributedLocker.tryLock(key, 600000, 600000);
  42. }
  43. );
  44. log.info( "A嘗試獲得鎖:thread={}", currentThreadId());
  45. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 300000, 600000);
  46. Assert.assertTrue(optLocked.isPresent());
  47. log.info( "A已獲得鎖:thread={}", currentThreadId());
  48. countDownLatch.countDown();
  49. optLocked.get().unlock();
  50. log.info( "A已釋放鎖:thread={}", currentThreadId());
  51. Optional<LockResource> lockResource2 = submit.get();
  52. Assert.assertTrue(lockResource2.isPresent());
  53. executorServiceB.submit(() -> {
  54. log.info( "B已獲得鎖:thread={}", currentThreadId());
  55. lockResource2.get().unlock();
  56. log.info( "B已釋放鎖:thread={}", currentThreadId());
  57. });
  58. }
  59. /**
  60. * 模擬3個線程爭搶鎖:A先獲取到鎖,A釋放鎖后,B和C同時爭搶鎖
  61. */
  62. @Test
  63. public void tryLock3() throws Exception {
  64. String key = "mock-key:" + UUID.randomUUID().toString().replace( "-", "");
  65. log.info( "A嘗試獲得鎖:thread={}", currentThreadId());
  66. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 600000, 600000);
  67. if (optLocked.isPresent()) {
  68. log.info( "A已獲得鎖:thread={}", currentThreadId());
  69. }
  70. Assert.assertTrue(optLocked.isPresent());
  71. CyclicBarrier cyclicBarrier = new CyclicBarrier( 2);
  72. Future<Optional<LockResource>> submitB = executorServiceB.submit(() -> {
  73. cyclicBarrier.await();
  74. log.info( "B嘗試獲得鎖:thread={}", currentThreadId());
  75. return distributedLocker.tryLock(key, 600000, 600000);
  76. }
  77. );
  78. Future<Optional<LockResource>> submitC = executorServiceC.submit(() -> {
  79. cyclicBarrier.await();
  80. log.info( "C嘗試獲得鎖:thread={}", currentThreadId());
  81. return distributedLocker.tryLock(key, 600000, 600000);
  82. }
  83. );
  84. optLocked.get().unlock();
  85. log.info( "A已釋放鎖:thread={}", currentThreadId());
  86. CountDownLatch countDownLatch = new CountDownLatch( 2);
  87. executorServiceB.submit(() -> {
  88. log.info( "B已獲得鎖:thread={}", currentThreadId());
  89. try {
  90. submitB.get().get().unlock();
  91. } catch (InterruptedException | ExecutionException e) {
  92. e.printStackTrace();
  93. }
  94. log.info( "B已釋放鎖:thread={}", currentThreadId());
  95. countDownLatch.countDown();
  96. });
  97. executorServiceC.submit(() -> {
  98. log.info( "C已獲得鎖:thread={}", currentThreadId());
  99. try {
  100. submitC.get().get().unlock();
  101. } catch (InterruptedException | ExecutionException e) {
  102. e.printStackTrace();
  103. }
  104. log.info( "C已釋放鎖:thread={}", currentThreadId());
  105. countDownLatch.countDown();
  106. });
  107. countDownLatch.await();
  108. }
  109. private static Long currentThreadId() {
  110. return Thread.currentThread().getId();
  111. }
  112. @Test
  113. public void tryLockWaitTimeout() throws Exception {
  114. String key = "mock-key:" + UUID.randomUUID().toString();
  115. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 2000);
  116. Assert.assertTrue(optLocked.isPresent());
  117. Optional<LockResource> optLockResource = CompletableFuture.supplyAsync(() -> {
  118. long now = System.currentTimeMillis();
  119. Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 1000, 10);
  120. long cost = System.currentTimeMillis() - now;
  121. log.info( "cost={}", cost);
  122. return optLockedAgain;
  123. }).exceptionally(th -> {
  124. log.error( "Exception: ", th);
  125. return Optional.empty();
  126. }).join();
  127. Assert.assertTrue(!optLockResource.isPresent());
  128. }
  129. @Test
  130. public void tryLockWithLeaseTime() throws Exception {
  131. String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
  132. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 3000, 1000);
  133. Assert.assertTrue(optLocked.isPresent());
  134. // 可重入
  135. Optional<LockResource> optLockedAgain = distributedLocker.tryLock(key, 3000, 1000);
  136. Assert.assertTrue(optLockedAgain.isPresent());
  137. }
  138. /**
  139. * 模擬1000個並發請求槍一把鎖
  140. */
  141. @Test
  142. public void tryLockWithLeaseTimeOnMultiThread() throws Exception {
  143. int totalThread = 1000;
  144. String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
  145. AtomicInteger tryAcquireLockTimes = new AtomicInteger( 0);
  146. AtomicInteger acquiredLockTimes = new AtomicInteger( 0);
  147. ExecutorService executor = Executors.newFixedThreadPool(totalThread);
  148. for ( int i = 0; i < totalThread; i++) {
  149. executor.submit( new Runnable() {
  150. @Override
  151. public void run() {
  152. tryAcquireLockTimes.getAndIncrement();
  153. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10, 10000);
  154. if (optLocked.isPresent()) {
  155. acquiredLockTimes.getAndIncrement();
  156. }
  157. }
  158. });
  159. }
  160. executor.awaitTermination( 15, TimeUnit.SECONDS);
  161. Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
  162. Assert.assertTrue(acquiredLockTimes.get() == 1);
  163. }
  164. @Test
  165. public void tryLockWithLeaseTimeOnMultiThread2() throws Exception {
  166. int totalThread = 100;
  167. String key = "mock-key-with-leaseTime:" + UUID.randomUUID().toString();
  168. AtomicInteger tryAcquireLockTimes = new AtomicInteger( 0);
  169. AtomicInteger acquiredLockTimes = new AtomicInteger( 0);
  170. ExecutorService executor = Executors.newFixedThreadPool(totalThread);
  171. for ( int i = 0; i < totalThread; i++) {
  172. executor.submit( new Runnable() {
  173. @Override
  174. public void run() {
  175. long now = System.currentTimeMillis();
  176. Optional<LockResource> optLocked = distributedLocker.tryLock(key, 10000, 5);
  177. long cost = System.currentTimeMillis() - now;
  178. log.info( "tryAcquireLockTimes={}||wait={}", tryAcquireLockTimes.incrementAndGet(), cost);
  179. if (optLocked.isPresent()) {
  180. acquiredLockTimes.getAndIncrement();
  181. // 主動釋放鎖
  182. optLocked.get().unlock();
  183. }
  184. }
  185. });
  186. }
  187. executor.awaitTermination( 20, TimeUnit.SECONDS);
  188. log.info( "tryAcquireLockTimes={}, acquireLockTimes={}", tryAcquireLockTimes.get(), acquiredLockTimes.get());
  189. Assert.assertTrue(tryAcquireLockTimes.get() == totalThread);
  190. Assert.assertTrue(acquiredLockTimes.get() == totalThread);
  191. }
  192. }
  193. public interface DistributedLocker {
  194. Optional<LockResource> tryLock(String lockKey, int waitTime);
  195. Optional<LockResource> tryLock(String lockKey, int waitTime, int leaseTime);
  196. }
  197. public interface LockResource {
  198. void unlock();
  199. }

執行的Lua腳本如下:

加鎖:redissonClient.getLock("my_first_lock_name").tryLock(600000, 600000); 
解鎖:redissonClient.getLock("my_first_lock_name").unlock();


    
    
   
   
           
  1. # 線程A
  2. ## 1.1. 1嘗試獲取鎖 -> 成功
  3. 1568357723.205362 [ 0 127.0. 0.1: 56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
  4. 1568357723.205452 [ 0 lua] "exists" "my_first_lock_name"
  5. 1568357723.208858 [ 0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "1"
  6. 1568357723.208874 [ 0 lua] "pexpire" "my_first_lock_name" "600000"
  7. # 線程B
  8. ### 2.1. 1嘗試獲取鎖,未獲取到,返回鎖剩余過期時間
  9. 1568357773.338018 [ 0 127.0. 0.1: 56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
  10. 1568357773.338161 [ 0 lua] "exists" "my_first_lock_name"
  11. 1568357773.338177 [ 0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
  12. 1568357773.338197 [ 0 lua] "pttl" "my_first_lock_name"
  13. ## 2.1. 1.3 添加訂閱(非Lua腳本) -> 訂閱成功
  14. 1568357799.403341 [ 0 127.0. 0.1: 56421] "SUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"
  15. ## 2.1. 1.4 再次嘗試獲取鎖 -> 未獲取到,返回鎖剩余過期時間
  16. 1568357830.683631 [ 0 127.0. 0.1: 56418] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
  17. 1568357830.684371 [ 0 lua] "exists" "my_first_lock_name"
  18. 1568357830.684428 [ 0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
  19. 1568357830.684485 [ 0 lua] "pttl" "my_first_lock_name"
  20. # 線程A
  21. ## 3.1. 1 釋放鎖並廣播解鎖消息, 0代表解鎖消息
  22. 1568357922.122454 [ 0 127.0. 0.1: 56420] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
  23. 1568357922.123645 [ 0 lua] "exists" "my_first_lock_name"
  24. 1568357922.123701 [ 0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1"
  25. 1568357922.123741 [ 0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:1" "-1"
  26. 1568357922.123775 [ 0 lua] "del" "my_first_lock_name"
  27. 1568357922.123799 [ 0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"
  28. # 線程B
  29. ## 監聽到解鎖消息消息 -> 釋放信號量,阻塞被解除; 4.1. 1.1 再次嘗試獲取鎖 -> 獲取成功
  30. 1568357975.015206 [ 0 127.0. 0.1: 56419] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('hset', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then redis.call('hincrby', KEYS[1], ARGV[2], 1); redis.call('pexpire', KEYS[1], ARGV[1]); return nil; end; return redis.call('pttl', KEYS[1]);" "1" "my_first_lock_name" "600000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
  31. 1568357975.015579 [ 0 lua] "exists" "my_first_lock_name"
  32. 1568357975.015633 [ 0 lua] "hset" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "1"
  33. 1568357975.015721 [ 0 lua] "pexpire" "my_first_lock_name" "600000"
  34. ## 4.1. 1.3 取消訂閱(非Lua腳本)
  35. 1568358031.185226 [ 0 127.0. 0.1: 56421] "UNSUBSCRIBE" "redisson_lock__channel:{my_first_lock_name}"
  36. # 線程B
  37. ## 5.1. 1 釋放鎖並廣播解鎖消息
  38. 1568358255.551896 [ 0 127.0. 0.1: 56417] "EVAL" "if (redis.call('exists', KEYS[1]) == 0) then redis.call('publish', KEYS[2], ARGV[1]); return 1; end;if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then return nil;end; local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call('pexpire', KEYS[1], ARGV[2]); return 0; else redis.call('del', KEYS[1]); redis.call('publish', KEYS[2], ARGV[1]); return 1; end; return nil;" "2" "my_first_lock_name" "redisson_lock__channel:{my_first_lock_name}" "0" "30000" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
  39. 1568358255.552125 [ 0 lua] "exists" "my_first_lock_name"
  40. 1568358255.552156 [ 0 lua] "hexists" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26"
  41. 1568358255.552200 [ 0 lua] "hincrby" "my_first_lock_name" "58c62432-bb74-4d14-8a00-9908cc8b828f:26" "-1"
  42. 1568358255.552258 [ 0 lua] "del" "my_first_lock_name"
  43. 1568358255.552304 [ 0 lua] "publish" "redisson_lock__channel:{my_first_lock_name}" "0"

需要特別注意的是,RedissonLock 同樣沒有解決 節點掛掉的時候,存在丟失鎖的風險的問題。而現實情況是有一些場景無法容忍的,所以 Redisson 提供了實現了redlock算法的 RedissonRedLock,RedissonRedLock 真正解決了單點失敗的問題,代價是需要額外的為 RedissonRedLock 搭建Redis環境。

所以,如果業務場景可以容忍這種小概率的錯誤,則推薦使用 RedissonLock, 如果無法容忍,則推薦使用 RedissonRedLock。

redlock算法

Redis 官網對 redLock 算法的介紹大致如下:

The Redlock algorithm

在分布式版本的算法里我們假設我們有N個Redis master節點,這些節點都是完全獨立的,我們不用任何復制或者其他隱含的分布式協調機制。之前我們已經描述了在Redis單實例下怎么安全地獲取和釋放鎖。我們確保將在每(N)個實例上使用此方法獲取和釋放鎖。在我們的例子里面我們把N設成5,這是一個比較合理的設置,所以我們需要在5台機器上面或者5台虛擬機上面運行這些實例,這樣保證他們不會同時都宕掉。為了取到鎖,客戶端應該執行以下操作:

  1. 獲取當前Unix時間,以毫秒為單位。

  2. 依次嘗試從5個實例,使用相同的key和具有唯一性的value(例如UUID)獲取鎖。當向Redis請求獲取鎖時,客戶端應該設置一個嘗試從某個Reids實例獲取鎖的最大等待時間(超過這個時間,則立馬詢問下一個實例),這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間為10秒,則超時時間應該在5-50毫秒之間。這樣可以避免服務器端Redis已經掛掉的情況下,客戶端還在死死地等待響應結果。如果服務器端沒有在規定時間內響應,客戶端應該盡快嘗試去另外一個Redis實例請求獲取鎖。

  3. 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖消耗的時間。當且僅當從大多數(N/2+1,這里是3個節點)的Redis節點都取到鎖,並且使用的總耗時小於鎖失效時間時,鎖才算獲取成功。

  4. 如果取到了鎖,key的真正有效時間 = 有效時間(獲取鎖時設置的key的自動超時時間) - 獲取鎖的總耗時(詢問各個Redis實例的總耗時之和)(步驟3計算的結果)。

  5. 如果因為某些原因,最終獲取鎖失敗(即沒有在至少 “N/2+1 ”個Redis實例取到鎖或者“獲取鎖的總耗時”超過了“有效時間”),客戶端應該在所有的Redis實例上進行解鎖(即便某些Redis實例根本就沒有加鎖成功,這樣可以防止某些節點獲取到鎖但是客戶端沒有得到響應而導致接下來的一段時間不能被重新獲取鎖)。

用 Redisson 實現分布式鎖(紅鎖 RedissonRedLock)及源碼分析(實現三)

這里以三個單機模式為例,需要特別注意的是他們完全互相獨立,不存在主從復制或者其他集群協調機制。


    
    
   
   
           
  1. Config config1 = new Config();
  2. config1.useSingleServer().setAddress( "redis://172.0.0.1:5378").setPassword( "a123456").setDatabase( 0);
  3. RedissonClient redissonClient1 = Redisson.create(config1);
  4. Config config2 = new Config();
  5. config2.useSingleServer().setAddress( "redis://172.0.0.1:5379").setPassword( "a123456").setDatabase( 0);
  6. RedissonClient redissonClient2 = Redisson.create(config2);
  7. Config config3 = new Config();
  8. config3.useSingleServer().setAddress( "redis://172.0.0.1:5380").setPassword( "a123456").setDatabase( 0);
  9. RedissonClient redissonClient3 = Redisson.create(config3);
  10. /**
  11. * 獲取多個 RLock 對象
  12. */
  13. RLock lock1 = redissonClient1.getLock(lockKey);
  14. RLock lock2 = redissonClient2.getLock(lockKey);
  15. RLock lock3 = redissonClient3.getLock(lockKey);
  16. /**
  17. * 根據多個 RLock 對象構建 RedissonRedLock (最核心的差別就在這里)
  18. */
  19. RedissonRedLock redLock = new RedissonRedLock(lock1, lock2, lock3);
  20. try {
  21. /**
  22. * 4.嘗試獲取鎖
  23. * waitTimeout 嘗試獲取鎖的最大等待時間,超過這個值,則認為獲取鎖失敗
  24. * leaseTime 鎖的持有時間,超過這個時間鎖會自動失效(值應設置為大於業務處理的時間,確保在鎖有效期內業務能處理完)
  25. */
  26. boolean res = redLock.tryLock(( long)waitTimeout, ( long)leaseTime, TimeUnit.SECONDS);
  27. if (res) {
  28. //成功獲得鎖,在這里處理業務
  29. }
  30. } catch (Exception e) {
  31. throw new RuntimeException( "aquire lock fail");
  32. } finally{
  33. //無論如何, 最后都要解鎖
  34. redLock.unlock();
  35. }

最核心的變化就是需要構建多個 RLock ,然后根據多個 RLock 構建成一個 RedissonRedLock,因為 redLock 算法是建立在多個互相獨立的 Redis 環境之上的(為了區分可以叫為 Redission node),Redission node 節點既可以是單機模式(single),也可以是主從模式(master/salve),哨兵模式(sentinal),或者集群模式(cluster)。這就意味着,不能跟以往這樣只搭建 1個 cluster、或 1個 sentinel 集群,或是1套主從架構就了事了,需要為 RedissonRedLock 額外搭建多幾套獨立的 Redission 節點。 比如可以搭建3個 或者5個 Redission節點,具體可看視資源及業務情況而定。

下圖是一個利用多個 Redission node 最終 組成 RedLock分布式鎖的例子,需要特別注意的是每個 Redission node 是互相獨立的,不存在任何復制或者其他隱含的分布式協調機制。


# Redisson 實現redlock算法源碼分析(RedLock)

加鎖核心代碼

org.redisson.RedissonMultiLock#tryLock


    
    
   
   
           
  1. public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
  2. long newLeaseTime = - 1;
  3. if (leaseTime != - 1) {
  4. newLeaseTime = unit.toMillis(waitTime)* 2;
  5. }
  6. long time = System.currentTimeMillis();
  7. long remainTime = - 1;
  8. if (waitTime != - 1) {
  9. remainTime = unit.toMillis(waitTime);
  10. }
  11. long lockWaitTime = calcLockWaitTime(remainTime);
  12. /**
  13. * 1. 允許加鎖失敗節點個數限制(N-(N/2+1))
  14. */
  15. int failedLocksLimit = failedLocksLimit();
  16. /**
  17. * 2. 遍歷所有節點通過EVAL命令執行lua加鎖
  18. */
  19. List<RLock> acquiredLocks = new ArrayList<>(locks.size());
  20. for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
  21. RLock lock = iterator.next();
  22. boolean lockAcquired;
  23. /**
  24. * 3.對節點嘗試加鎖
  25. */
  26. try {
  27. if (waitTime == - 1 && leaseTime == - 1) {
  28. lockAcquired = lock.tryLock();
  29. } else {
  30. long awaitTime = Math.min(lockWaitTime, remainTime);
  31. lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
  32. }
  33. } catch (RedisResponseTimeoutException e) {
  34. // 如果拋出這類異常,為了防止加鎖成功,但是響應失敗,需要解鎖所有節點
  35. unlockInner(Arrays.asList(lock));
  36. lockAcquired = false;
  37. } catch (Exception e) {
  38. // 拋出異常表示獲取鎖失敗
  39. lockAcquired = false;
  40. }
  41. if (lockAcquired) {
  42. /**
  43. *4. 如果獲取到鎖則添加到已獲取鎖集合中
  44. */
  45. acquiredLocks.add(lock);
  46. } else {
  47. /**
  48. * 5. 計算已經申請鎖失敗的節點是否已經到達 允許加鎖失敗節點個數限制 (N-(N/2+1))
  49. * 如果已經到達, 就認定最終申請鎖失敗,則沒有必要繼續從后面的節點申請了
  50. * 因為 Redlock 算法要求至少N/2+1 個節點都加鎖成功,才算最終的鎖申請成功
  51. */
  52. if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
  53. break;
  54. }
  55. if (failedLocksLimit == 0) {
  56. unlockInner(acquiredLocks);
  57. if (waitTime == - 1 && leaseTime == - 1) {
  58. return false;
  59. }
  60. failedLocksLimit = failedLocksLimit();
  61. acquiredLocks.clear();
  62. // reset iterator
  63. while (iterator.hasPrevious()) {
  64. iterator.previous();
  65. }
  66. } else {
  67. failedLocksLimit--;
  68. }
  69. }
  70. /**
  71. * 6.計算 目前從各個節點獲取鎖已經消耗的總時間,如果已經等於最大等待時間,則認定最終申請鎖失敗,返回false
  72. */
  73. if (remainTime != - 1) {
  74. remainTime -= System.currentTimeMillis() - time;
  75. time = System.currentTimeMillis();
  76. if (remainTime <= 0) {
  77. unlockInner(acquiredLocks);
  78. return false;
  79. }
  80. }
  81. }
  82. if (leaseTime != - 1) {
  83. List<RFuture<Boolean>> futures = new ArrayList<>(acquiredLocks.size());
  84. for (RLock rLock : acquiredLocks) {
  85. RFuture<Boolean> future = ((RedissonLock) rLock).expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
  86. futures.add(future);
  87. }
  88. for (RFuture<Boolean> rFuture : futures) {
  89. rFuture.syncUninterruptibly();
  90. }
  91. }
  92. /**
  93. * 7.如果邏輯正常執行完則認為最終申請鎖成功,返回true
  94. */
  95. return true;
  96. }

參考文獻

[1]Distributed locks with Redis

[2]Distributed locks with Redis 中文版

[3]SET - Redis

[4]EVAL command

[5] Redisson

[6]Redis分布式鎖的正確實現方式

[7]Redlock實現分布式鎖

[8]Redisson實現Redis分布式鎖


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM