RedissonClient獲取鎖源碼解析


代碼示例:

public static boolean acquire(RedissonClient redisson, String lockKey, Long waitTime, Long leaseTime)  {
     // 實例化鎖對象(此時未請求redis) RLock lock
= redisson.getLock(lockKey); boolean lockResult; try { lock.lock();
       // 加鎖 lockResult
= lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS); }catch (Exception e){ lockResult = false; } if (lockResult){ log.info("======lock success <"+lockKey+">======"+Thread.currentThread().getName()); }else { log.error("======lock error <"+lockKey+">======"+Thread.currentThread().getName()); } //加鎖成功 return lockResult; }

 

流程圖:

 

源碼分析:

Redisson實例化

/************************Redisson************************/

//創建RedissonClient實現類對象
public static RedissonClient create(Config config) {
   Redisson redisson = new Redisson(config);
   if (config.isRedissonReferenceEnabled()) {
       redisson.enableRedissonReferenceSupport();
   }
   return redisson;
}

//實例化 new Redisson(config);
//方法被protected修飾, 只能通過靜態方法create(Config config)創建
protected Redisson(Config config) {
   this.config = config;
   //配置賦值
   Config configCopy = new Config(config);
   //連接管理器 連接redis有多種模式(集群 托管 單例 主從 哨兵), 所以需要根據配置創建一種連接方式
   connectionManager = ConfigSupport.createConnectionManager(configCopy);
   //執行器 依賴連接管理器
   commandExecutor = new CommandSyncService(connectionManager);
   //回收管理器
   evictionScheduler = new EvictionScheduler(commandExecutor);
   //編解碼器
   codecProvider = config.getCodecProvider();
   //解釋器
   resolverProvider = config.getResolverProvider();
}

1 鎖實例化(只是實例化 未請求redis) RLock lock = redisson.getLock(lockKey)

簡述獲取鎖的過程

/**************************Redisson***********************/
@Override
public RLock getLock(String name) {
  //commandExecutor命令執行器 
  //id redisson唯一ID(一般一組redis只注入一個redisson)
   return new RedissonLock(commandExecutor, name, id);
}


/**************************RedissonLock***********************/
protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) {
   super(commandExecutor, name);
   this.commandExecutor = commandExecutor;
   this.id = id;
}

/**************************RedissonExpirable***********************/
RedissonExpirable(CommandAsyncExecutor connectionManager, String name) {
   super(connectionManager, name);
}


/**************************RedissonObject***********************/
public RedissonObject(CommandAsyncExecutor commandExecutor, String name) {
   this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name);
}
public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) {
   this.codec = codec;
   this.name = name;
   this.commandExecutor = commandExecutor;
}

2 請求rediss獲取鎖  lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS)

/************************RedissonLock************************/
public boolean tryLock(long waitTime /*獲取鎖等待時間*/, long leaseTime /*鎖過期時間*/, TimeUnit unit) throws InterruptedException {
    long time = unit.toMillis(waitTime);
    long current = System.currentTimeMillis();
    //當前線程ID 用戶拼接key
    final long threadId = Thread.currentThread().getId();
    //嘗試獲取鎖 如果ttl==null則獲取到鎖 ttl!=null則表示鎖已存在,ttl為鎖剩余時間(毫秒)
    //tryAcquire最終實現方法tryAcquireAsync在下面代碼
    Long ttl = tryAcquire(leaseTime, unit);
    //獲取鎖成功
    if (ttl == null) {
        return true;
    }
    
    //time = time-上一次嘗試獲取鎖之后所用時間 
    time -= (System.currentTimeMillis() - current);
    //這里有兩種情況
    //1 若waitTime為負數(不失效緩存), 則返回false
    //2 若waitTime為正數 經過第一次嘗試獲取鎖之后未獲取成功 但已經超過等待時長 返回false
    if (time <= 0) {
        return false;
    }
    
    current = System.currentTimeMillis();
    //訂閱消息
    final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
    //阻塞等待一段時間(time) 等待訂閱結果
    if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
        if (!subscribeFuture.cancel(false)) {
            subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() {
                @Override
                public void operationComplete(Future<RedissonLockEntry> future) throws Exception {
                    if (subscribeFuture.isSuccess()) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                }
            });
        }
        //嘗試獲取CountDownLatch鎖失敗時直接返回失敗
        return false;
    }

    try {
        time -= (System.currentTimeMillis() - current);
        //超過waittime返回失敗
        if (time <= 0) {
            return false;
        }
        //死循環 很簡單 就是在等待時長內重復獲取鎖 直到獲取成功或超時
        while (true) {
            long currentTime = System.currentTimeMillis();
            //嘗試獲取鎖 tryAcquire最終實現方法tryAcquireAsync在下面代碼
            ttl = tryAcquire(leaseTime, unit);
            // 過期時間為null 則獲取成功
            if (ttl == null) {
                return true;
            }

            time -= (System.currentTimeMillis() - currentTime);
            //依然判斷超時
            if (time <= 0) {
                return false;
            }

            currentTime = System.currentTimeMillis();
            //等待時間大於過期時間 那么就等已存在的過期之后再獲取
            if (ttl >= 0 && ttl < time) {
                //阻塞時長ttl
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } 
            //嘗試獲取鎖
            else {
                //阻塞時長time
                getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
            }
            //再次計算time>0 
            time -= (System.currentTimeMillis() - currentTime);
            //若是小於等於0則為超時 直接返回false 否則進入下一次循環( while (true) )
            if (time <= 0) {
                return false;
            }
        }
    } finally {
        //最終取消訂閱 訂閱代碼為 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId)
        unsubscribe(subscribeFuture, threadId);
    }
}

//嘗試獲取鎖(同步)
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
    //若過期時間為有限時間(leaseTime==-1為永不過期)
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    //以下為永不過期
    //設置把永不過期時間改為30秒過期
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
    //注冊監聽器
    ttlRemainingFuture.addListener(new FutureListener<Long>() {

        //獲取鎖操作結束觸發下面操作
        @Override
        public void operationComplete(Future<Long> future) throws Exception {
            //如果為獲取到鎖 不做任何事情
            if (!future.isSuccess()) {
                return;
            }
            Long ttlRemaining = future.getNow();
            // 獲取到了鎖
            if (ttlRemaining == null) {
                //定期對鎖進行延時 達到永不過期目的
                scheduleExpirationRenewal(threadId);
            }
        }
    });
    return ttlRemainingFuture;
}


//嘗試獲取鎖(同步方式)
//看下面代碼之前需要先了解Redis的Hash的數據結構, 下面腳本使用的就是Hash
//String數據結構為 Map<Key, Value>, 通常根據Key取值
//Hash數據結構為Map<Key, Map<Field, Value>>, Field
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    //時間轉換
    internalLockLeaseTime = unit.toMillis(leaseTime);
    //執行調用redis腳本 
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
              //為了方便理解對下面數組進行解釋 KEYS[1]為Hash:設置的lockKey, ARGV[1]過期時間:leaseTime, ARGV[2]為Hash的Field:當前線程ID和當前redisson拼接
              //如果key不存在 則設置當前key(KEYS[1]) field(ARGV[2]) value(1), 設置key(KEYS[1])過期時間ARGV[1] 返回空值(nil)
              "if (redis.call('exists', KEYS[1]) == 0) then " +
                  "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              //如果根據key和field能查詢到值, 則value在原來基礎上加1
              //這里可能會有誤解: 上面已經判斷key已經存在, 因為這里是獨占鎖, 
              //若根據此鎖不屬於當前線程(redis.call('hexists', KEYS[1], ARGV[2]) != 1), 則肯定被其他線程占有,獲取鎖失敗
              "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                  "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                  "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                  "return nil; " +
              "end; " +
              //返回當前key的剩余時間
              "return redis.call('pttl', KEYS[1]);",
                //關鍵代碼
                Collections.<Object>singletonList(getName()) /*KEYS*/, internalLockLeaseTime /*ARGV[1]*/, getLockName(threadId)/*ARGV[2]*/ );
}

 

額外拓展:

redisson使用大量的異步操作(基於netty),代碼比較難讀,下面針對訂閱lockKey的代碼畫出大致類結構圖 RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId)

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM