【分布式鎖】Redis實現可重入的分布式鎖


一、前言

  之前寫的一篇文章《細說分布式鎖》介紹了分布式鎖的三種實現方式,但是Redis實現分布式鎖關於Lua腳本實現、自定義分布式鎖注解以及需要注意的問題都沒描述。本文就是詳細說明如何利用Redis實現重入的分布式鎖。

二、方案

死鎖問題

  當一個客戶端獲取鎖成功之后,假如它崩潰了導致它再也無法和 Redis 節點通信,那么它就會一直持有這個鎖,導致其它客戶端永遠無法獲得鎖了,因此鎖必須要有一個自動釋放的時間。
  我們需要保證setnx命令和expire命令以原子的方式執行,否則如果客戶端執行setnx獲得鎖后,這時客戶端宕機了,那么這把鎖沒有設置過期時間,導致其他客戶端永遠無法獲得鎖了。

鎖被其他線程釋放

  如果不加任何處理即簡單使用 SETNX 實現 Redis 分布式鎖,就會遇到一個問題:如果線程 C1 獲得鎖,但由於業務處理時間過長,鎖在線程 C1 還未處理完業務之前已經過期了,這時線程 C2 獲得鎖,在線程 C2 處理業務期間線程 C1 完成業務執行釋放鎖操作,但這時線程 C2 仍在處理業務線程 C1 釋放了線程 C2 的鎖,導致線程 C2 業務處理實際上沒有鎖提供保護機制;同理線程 C2 可能釋放線程 C3 的鎖,從而導致嚴重的問題。
  因此每個線程釋放鎖的時候只能釋放自己的鎖,即鎖必須要有一個擁有者的標記,並且也需要保證釋放鎖的原子性操作。
  在釋放鎖的時候判斷擁有者的標記(value是否相同),只有相同時才可以刪除,同時利用Lua腳本來達到原子操作,腳本如下:

  1. if redis.call("get", KEYS[1]) == ARGV[1] then 
  2. return redis.call("del", KEYS[1]) 
  3. else 
  4. return 0 
  5. end 

可重入問題

可重入鎖指的是可重復可遞歸調用的鎖,在外層使用鎖之后,在內層仍然可以使用,如果沒有可重入鎖的支持,在第二次嘗試獲得鎖時將會進入死鎖狀態。
這里有兩種解決方案:

  1. 客戶端在獲得鎖后保存value(擁有者標記),然后釋放鎖的時候將value和key同時傳過去。
  2. 利用ThreadLocal實現,獲取鎖后將Redis中的value保存在ThreadLocal中,同一線程再次嘗試獲取鎖的時候就先將 ThreadLocal 中的 值 與 Redis 的 value 比較,如果相同則表示這把鎖所以該線程,即實現可重入鎖。

這里的實現的方案是基於單機Redis,之前說的集群問題這里暫不考慮。

三、編碼

我們通過自定義分布式鎖注解+AOP可以更加方便的使用分布式鎖,只需要在加鎖的方法上加上注解即可。
Redis分布式鎖接口

/** * Redis分布式鎖接口 * Created by 2YSP on 2019/9/20. */
public interface IRedisDistributedLock {

  /** * * @param key * @param requireTimeOut 獲取鎖超時時間 單位ms * @param lockTimeOut 鎖過期時間,一定要大於業務執行時間 單位ms * @param retries 嘗試獲取鎖的最大次數 * @return */
  boolean lock(String key, long requireTimeOut, long lockTimeOut, int retries);

  /** * 釋放鎖 * @param key * @return */
  boolean release(String key);

}

Redis 分布式鎖實現類

/** * Redis 分布式鎖實現類 * Created by 2YSP on 2019/9/20. */
@Slf4j
@Component
public class RedisDistributedLockImpl implements IRedisDistributedLock {

  /** * key前綴 */
  public static final String PREFIX = "Lock:";
  /** * 保存鎖的value */
  private ThreadLocal<String> threadLocal = new ThreadLocal<>();

  private static final Charset UTF8 = Charset.forName("UTF-8");
  /** * 釋放鎖腳本 */
  private static final String UNLOCK_LUA;

  /* * 釋放鎖腳本,原子操作 */
  static {
    StringBuilder sb = new StringBuilder();
    sb.append("if redis.call(\"get\",KEYS[1]) == ARGV[1] ");
    sb.append("then ");
    sb.append(" return redis.call(\"del\",KEYS[1]) ");
    sb.append("else ");
    sb.append(" return 0 ");
    sb.append("end ");
    UNLOCK_LUA = sb.toString();
  }

  @Autowired
  private RedisTemplate redisTemplate;

  @Override
  public boolean lock(String key, long requireTimeOut, long lockTimeOut, int retries) {
    //可重入鎖判斷
    String originValue = threadLocal.get();
    if (!StringUtils.isBlank(originValue) && isReentrantLock(key, originValue)) {
      return true;
    }
    String value = UUID.randomUUID().toString();
    long end = System.currentTimeMillis() + requireTimeOut;
    int retryTimes = 1;

    try {
      while (System.currentTimeMillis() < end) {
        if (retryTimes > retries) {
          log.error(" require lock failed,retry times [{}]", retries);
          return false;
        }
        if (setNX(wrapLockKey(key), value, lockTimeOut)) {
          threadLocal.set(value);
          return true;
        }
        // 休眠10ms
        Thread.sleep(10);

        retryTimes++;
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    return false;
  }

  private boolean setNX(String key, String value, long expire) {
    /** * List設置lua的keys */
    List<String> keyList = new ArrayList<>();
    keyList.add(key);
    return (boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
      Boolean result = connection
          .set(key.getBytes(UTF8),
              value.getBytes(UTF8),
              Expiration.milliseconds(expire),
              SetOption.SET_IF_ABSENT);
      return result;
    });

  }

  /** * 是否為重入鎖 */
  private boolean isReentrantLock(String key, String originValue) {
    String v = (String) redisTemplate.opsForValue().get(key);
    return v != null && originValue.equals(v);
  }

  @Override
  public boolean release(String key) {
    String originValue = threadLocal.get();
    if (StringUtils.isBlank(originValue)) {
      return false;
    }
    return (boolean) redisTemplate.execute((RedisCallback<Boolean>) connection -> {
      return connection
          .eval(UNLOCK_LUA.getBytes(UTF8), ReturnType.BOOLEAN, 1, wrapLockKey(key).getBytes(UTF8),
              originValue.getBytes(UTF8));
    });
  }


  private String wrapLockKey(String key) {
    return PREFIX + key;
  }
}

分布式鎖注解

@Retention(value = RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface DistributedLock {

  /** * 默認包名加方法名 * @return */
  String key() default "";

  /** * 過期時間 單位:毫秒 * <pre> * 過期時間一定是要長於業務的執行時間. * </pre> */
  long expire() default 30000;

  /** * 獲取鎖超時時間 單位:毫秒 * <pre> * 結合業務,建議該時間不宜設置過長,特別在並發高的情況下. * </pre> */
  long timeout() default 3000;

  /** * 默認重試次數 * @return */
  int retryTimes() default Integer.MAX_VALUE;

}

aop切片類

@Component
@Aspect
@Slf4j
public class RedisLockAop {

  @Autowired
  private IRedisDistributedLock redisDistributedLock;


  @Around(value = "@annotation(lock)")
  public Object doAroundAdvice(ProceedingJoinPoint proceedingJoinPoint, DistributedLock lock) {
    // 加鎖
    String key = getKey(proceedingJoinPoint, lock);
    Boolean success = null;
    try {
        success = redisDistributedLock
          .lock(key, lock.timeout(), lock.expire(), lock.retryTimes());
      if (success) {
        log.info(Thread.currentThread().getName() + " 加鎖成功");
        return proceedingJoinPoint.proceed();
      }
      log.info(Thread.currentThread().getName() + " 加鎖失敗");
      return null;
    } catch (Throwable throwable) {
      throwable.printStackTrace();
      return null;
    } finally {
      if (success){
        boolean result = redisDistributedLock.release(key);
        log.info(Thread.currentThread().getName() + " 釋放鎖結果:{}",result);
      }
    }
  }

  private String getKey(JoinPoint joinPoint, DistributedLock lock) {
    if (!StringUtils.isBlank(lock.key())) {
      return lock.key();
    }
    return joinPoint.getSignature().getDeclaringTypeName() + "." + joinPoint.getSignature()
        .getName();
  }
}

業務邏輯處理類

@Service
public class TestService {

  @DistributedLock(retryTimes = 1000,timeout = 1000)
  public String lockTest() {
    try {
      System.out.println("模擬執行業務邏輯。。。");
      Thread.sleep(100);
    } catch (InterruptedException e) {
      e.printStackTrace();
      return "error";
    }

    return "ok";
  }
}

四、測試

  1. 啟動本地redis,啟動項目
  2. 打開cmd,利用ab壓力測試設置3個線程100個請求。

ab -c 3 -n 100 http://localhost:8000/lock/test

idea控制台輸出如下:

 

enter description here
enter description here

 

至此大功告成,代碼地址

ps: 遇到一個奇怪的問題,我用 RedisTemplate.execute(RedisScript script, List keys, Object... args) 這個方法,通過加載resource目錄下的lua腳本來釋放鎖的時候一直不成功,參數沒任何問題,而且我之前的文章就是用這個方法可以正確的釋放鎖。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM