RLock的代碼及學習


1:高性能鎖

1.1 互斥

在分布式高並發的條件下,需要保證,同一時刻只能有一個線程獲得鎖。

1.2 防止死鎖

在分布式高並發的條件下,比如有個線程獲得鎖的同時,還沒有來得及去釋放鎖,就因為系統故障或者其它原因使它無法執行釋放鎖的命令,導致其它線程都無法獲得鎖,造成死鎖。

所以分布式非常有必要設置鎖的有效時間,確保系統出現故障后,在一定時間內能夠主動去釋放鎖,避免造成死鎖的情況。

1.3 性能

對於訪問量大的共享資源,需要考慮減少鎖等待的時間,避免導致大量線程阻塞。

所以在鎖的設計時,需要考慮兩點。

  • 1、鎖的顆粒度要盡量小。比如你要通過鎖來減庫存,那這個鎖的名稱你可以設置成是商品的ID,而不是任取名稱。這樣這個鎖只對當前商品有效,鎖的顆粒度小。

  • 2、鎖的范圍盡量要小`。比如只要鎖2行代碼就可以解決問題的,那就不要去鎖10行代碼了。

1.4 可重入

ReentrantLock是可重入鎖,那它的特點就是:同一個線程可以重復拿到同一個資源的鎖。重入鎖非常有利於資源的高效利用。

2:Redission的原理分析

img

2.1 加鎖機制

線程去獲取鎖,獲取成功: 執行lua腳本,保存數據到redis數據庫。

線程去獲取鎖,獲取失敗: 一直通過while循環嘗試獲取鎖,獲取成功后,執行lua腳本,保存數據到redis數據庫。

2.2 watch dog自動延期機制

在一個分布式環境下,假如一個線程獲得鎖后,突然服務器宕機了,那么這個時候在一定時間后這個鎖會自動釋放,你也可以設置鎖的有效時間(不設置默認30秒),這樣的目的主要是防止死鎖的發生。

2.3 為啥要用lua腳本

主要是如果你的業務邏輯復雜的話,通過封裝在lua腳本中發送給redis,而且redis是單線程的,這樣就保證這段復雜業務邏輯執行的原子性

在分布式鎖中,加鎖的操作需要多條命令,使用lua腳本保證了原子性。

2.4 可重入加鎖機制

Redisson可以實現可重入加鎖機制基於以下兩點進行實現:

1、Redis存儲鎖的數據類型是 Hash類型
2、Hash數據類型的key值包含了當前線程信息。

使用的是數據類型是Hash類型,Hash類型相當於我們java的 <key,<key1,value>> 類型,這里key是指 加鎖的key。例如:key1值為078e44a3-5f95-4e24-b6aa-80684655a15a:45它的組成是:

guid + 當前線程的ID。后面的value就是重入的次數。

3:Redis分布式鎖的缺點

(1):如果使用的是單Master節點,那么可能會因為主備節點切換時候,鎖數據沒有同步完成就出現一次加鎖,可能出現問題。

4:代碼分析

4.1 RLock接口

其中繼承的接口Lock,是JUC中的接口,具有Lock中的能力。同時它還有很多新特性:強制鎖釋放,帶有效期的鎖。

public interface RLock extends Lock, RLockAsync
public interface RRLock {
  //----------------------Lock接口方法-----------------------

  /**
    * 加鎖 鎖的有效期默認30秒
    */
  void lock();
  /**
    * tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他線程獲取),則返回false .
    */
  boolean tryLock();
  /**
    * tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,只不過區別在於這個方法在拿不到鎖時會等待一定的時間,
    * 在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。
    *
    * @param time 等待時間
    * @param unit 時間單位 小時、分、秒、毫秒等
    */
  boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
  /**
    * 解鎖
    */
  void unlock();
  /**
    * 中斷鎖 表示該鎖可以被中斷 假如A和B同時調這個方法,A獲取鎖,B為獲取鎖,那么B線程可以通過
    * Thread.currentThread().interrupt(); 方法真正中斷該線程
    */
  void lockInterruptibly();

  //----------------------RLock接口方法-----------------------
  /**
    * 加鎖 上面是默認30秒這里可以手動設置鎖的有效時間
    *
    * @param leaseTime 鎖有效時間
    * @param unit     時間單位 小時、分、秒、毫秒等
    */
  void lock(long leaseTime, TimeUnit unit);
  /**
    * 這里比上面多一個參數,多添加一個鎖的有效時間
    *
    * @param waitTime 等待時間
    * @param leaseTime 鎖有效時間
    * @param unit     時間單位 小時、分、秒、毫秒等
    */
  boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
  /**
    * 檢驗該鎖是否被線程使用,如果被使用返回True
    */
  boolean isLocked();
  /**
    * 檢查當前線程是否獲得此鎖(這個和上面的區別就是該方法可以判斷是否當前線程獲得此鎖,而不是此鎖是否被線程占有)
    * 這個比上面那個實用
    */
  boolean isHeldByCurrentThread();
  /**
    * 中斷鎖 和上面中斷鎖差不多,只是這里如果獲得鎖成功,添加鎖的有效時間
    * @param leaseTime 鎖有效時間
    * @param unit       時間單位 小時、分、秒、毫秒等
    */
  void lockInterruptibly(long leaseTime, TimeUnit unit);  
}
4.2:RLock實現類RedissonLock
public class RedissonLock extends RedissonExpirable implements RLock
4.2.1 void lock()方法
  • 1:RedissionLock的Lock方法

    @Override
    public void lock() {
      try {
          lock(-1, null, false);
      } catch (InterruptedException e) {
          throw new IllegalStateException();
      }
    }
  • 2:Lock帶參數

    private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
          long threadId = Thread.currentThread().getId();
          //1:開始進行嘗試獲取鎖
          Long ttl = tryAcquire(leaseTime, unit, threadId);
          // lock acquired
          if (ttl == null) {
              return;
          }
    //2:訂閱鎖的釋放信息
          RFuture<RedissonLockEntry> future = subscribe(threadId);
          //3:等待鎖的釋放
          if (interruptibly) {
              commandExecutor.syncSubscriptionInterrupted(future);
          } else {
              commandExecutor.syncSubscription(future);
          }

          try {
              while (true) {
              //4:循環多次嘗試獲取鎖
                  ttl = tryAcquire(leaseTime, unit, threadId);
                  // lock acquired
                  //5:獲取鎖成功,返回
                  if (ttl == null) {
                      break;
                  }

                  // waiting for message
                  //6:休眠等待鎖被釋放的信號,釋放被喚醒
                  if (ttl >= 0) {
                      try {
                          future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                      } catch (InterruptedException e) {
                          if (interruptibly) {
                              throw e;
                          }
                          future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                      }
                  } else {
                      if (interruptibly) {
                          future.getNow().getLatch().acquire();
                      } else {
                          future.getNow().getLatch().acquireUninterruptibly();
                      }
                  }
              }
          } finally {
          //7:退訂鎖的信息
              unsubscribe(future, threadId);
          }
    //       get(lockAsync(leaseTime, unit));
      }
  • 3:tryAcquire

    異步嘗試進行加鎖,嘗試加鎖的時候leaseTime為-1。通常如果客戶端沒有加鎖成功,則會進行阻塞,leaseTime為鎖釋放的時間。

    private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
      //使用get方法,獲取RFuture數據,沒有返回則阻塞
      return get(tryAcquireAsync(leaseTime, unit, threadId));
    }
    private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
      //進行加鎖,返回RFuture
      if (leaseTime != -1) {
      //異步進行獲取鎖的操作
          return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
      }
      RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
      ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
          if (e != null) {
              return;
          }

          // lock acquired
          if (ttlRemaining == null) {
          //進行看門狗的處理,見4.2.3
              scheduleExpirationRenewal(threadId);
          }
      });
      return ttlRemainingFuture;
    }

    生成lua腳本:

    <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
      internalLockLeaseTime = unit.toMillis(leaseTime);

      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);",
                  Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    }

上述lua腳本:

img

總結:

  • 1:加鎖,使用Lock沒有攜帶過期時間,但是會使用一個默認值: lockWatchdogTimeout = 30 * 1000;

  • 2:和ReentranLock一樣,在進行獲取鎖的時候,首先會先嘗試一次獲取鎖的操作,如果獲取鎖失敗使用信號量的方式,訂閱鎖的釋放信息,有了釋放鎖的信息,則進行嘗試加鎖,依此循環。

  • 3:根據鎖釋放的邏輯,鎖釋放的時候會發布鎖解除的消息,應該在2中訂閱對應,在釋放鎖的時候釋放鎖的信息。

4.2.2 解鎖
@Override
public void unlock() {
  try {
      get(unlockAsync(Thread.currentThread().getId()));
  } catch (RedisException e) {
      if (e.getCause() instanceof IllegalMonitorStateException) {
          throw (IllegalMonitorStateException) e.getCause();
      } else {
          throw e;
      }
  }
  • 1:unlockAsync方法

    @Override
    public RFuture<Void> unlockAsync(long threadId) {
      RPromise<Void> result = new RedissonPromise<Void>();
      //核心的解鎖流程
      RFuture<Boolean> future = unlockInnerAsync(threadId);
    //后取是解除看門狗進程
      future.onComplete((opStatus, e) -> {
          if (e != null) {
              cancelExpirationRenewal(threadId);
              result.tryFailure(e);
              return;
          }

          if (opStatus == null) {
              IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                      + id + " thread-id: " + threadId);
              result.tryFailure(cause);
              return;
          }
           
          cancelExpirationRenewal(threadId);
          result.trySuccess(null);
      });

      return result;
    }
  • 2:解鎖unlockInnerAsync

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
      /**
      *(1):判斷key是否存在,不存在即釋放成功;
      *(2):減少鎖定值1,減去后如果小於等於0,則釋放鎖,並且進行事件通知鎖釋放。
      */
      return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
              "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                  "return nil;" +
              "end; " +
              "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
              "if (counter > 0) then " +
                  "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                  "return 0; " +
              "else " +
                  "redis.call('del', KEYS[1]); " +
                  "redis.call('publish', KEYS[2], ARGV[1]); " +
                  "return 1; "+
              "end; " +
              "return nil;",
              Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }
4.2.3 看門狗實現

在方法:tryAcquireAsync中嘗試加鎖之后,會啟動定時任務進行看門狗的進程

entryName:鎖的唯一標記

entry:線程ID信息的包裝類

private void scheduleExpirationRenewal(long threadId) {
  ExpirationEntry entry = new ExpirationEntry();
  //在EXPIRATION_RENEWAL_MAP中添加entryNme和entry的數據
  ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
  if (oldEntry != null) {
      oldEntry.addThreadId(threadId);
  } else {
      entry.addThreadId(threadId);
      //啟動一個開門狗的線程
      renewExpiration();
  }
}
private void renewExpiration() {
  ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
  if (ee == null) {
      return;
  }
  //在internalLockLeaseTime / 3時間后執行
  Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
      @Override
      public void run(Timeout timeout) throws Exception {
          ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
          if (ent == null) {
              return;
          }
          Long threadId = ent.getFirstThreadId();
          if (threadId == null) {
              return;
          }
          //看門狗的核心方法
          RFuture<Boolean> future = renewExpirationAsync(threadId);
          future.onComplete((res, e) -> {
              if (e != null) {
                  log.error("Can't update lock " + getName() + " expiration", e);
                  return;
              }
               
              if (res) {
                  // reschedule itself
                  //命令執行成功之后,再循環調用
                  renewExpiration();
              }
          });
      }
  }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
   
  ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
  //使用lua腳本進行鎖的延長
  return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
              "redis.call('pexpire', KEYS[1], ARGV[1]); " +
              "return 1; " +
          "end; " +
          "return 0;",
      Collections.<Object>singletonList(getName()),
      internalLockLeaseTime, getLockName(threadId));
}

總結:看門狗就是維護一個靜態的Map集合中,集合中有線程信息ID,在鎖沒有釋放的情況下,定時的重新賦值鎖的失效時間,達到延長鎖的機制。

5:RedLock

RedLock是基於redis實現的分布式鎖,它能夠保證以下特性:

  • 互斥性:在任何時候,只能有一個客戶端能夠持有鎖;避免死鎖:
  • 當客戶端拿到鎖后,即使發生了網絡分區或者客戶端宕機,也不會發生死鎖;(利用key的存活時間)
  • 容錯性:只要多數節點的redis實例正常運行,就能夠對外提供服務,加鎖或者釋放鎖; RedLock算法思想,意思是不能只在一個redis實例上創建鎖,應該是在多個redis實例上創建鎖,n / 2 + 1,必須在大多數redis節點上都成功創建鎖,才能算這個整體的RedLock加鎖成功,避免說僅僅在一個redis實例上加鎖而帶來的問題。

Redisson中有一個MultiLock的概念,可以將多個鎖合並為一個大鎖,對一個大鎖進行統一的申請加鎖以及釋放鎖。而Redisson中實現RedLock就是基於MultiLock 去做的。實現原理其實很簡單,基於RedLock思想,遍歷所有的Redis客戶端,然后依次加鎖,最后統計成功的次數來判斷是否加鎖成功。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM