代碼示例:
public static boolean acquire(RedissonClient redisson, String lockKey, Long waitTime, Long leaseTime) {
// 實例化鎖對象(此時未請求redis) RLock lock = redisson.getLock(lockKey); boolean lockResult; try { lock.lock();
// 加鎖 lockResult = lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS); }catch (Exception e){ lockResult = false; } if (lockResult){ log.info("======lock success <"+lockKey+">======"+Thread.currentThread().getName()); }else { log.error("======lock error <"+lockKey+">======"+Thread.currentThread().getName()); } //加鎖成功 return lockResult; }
流程圖:

源碼分析:
Redisson實例化
/************************Redisson************************/ //創建RedissonClient實現類對象 public static RedissonClient create(Config config) { Redisson redisson = new Redisson(config); if (config.isRedissonReferenceEnabled()) { redisson.enableRedissonReferenceSupport(); } return redisson; } //實例化 new Redisson(config); //方法被protected修飾, 只能通過靜態方法create(Config config)創建 protected Redisson(Config config) { this.config = config; //配置賦值 Config configCopy = new Config(config); //連接管理器 連接redis有多種模式(集群 托管 單例 主從 哨兵), 所以需要根據配置創建一種連接方式 connectionManager = ConfigSupport.createConnectionManager(configCopy); //執行器 依賴連接管理器 commandExecutor = new CommandSyncService(connectionManager); //回收管理器 evictionScheduler = new EvictionScheduler(commandExecutor); //編解碼器 codecProvider = config.getCodecProvider(); //解釋器 resolverProvider = config.getResolverProvider(); }
1 鎖實例化(只是實例化 未請求redis) RLock lock = redisson.getLock(lockKey)
簡述獲取鎖的過程
/**************************Redisson***********************/ @Override public RLock getLock(String name) { //commandExecutor命令執行器 //id redisson唯一ID(一般一組redis只注入一個redisson) return new RedissonLock(commandExecutor, name, id); } /**************************RedissonLock***********************/ protected RedissonLock(CommandExecutor commandExecutor, String name, UUID id) { super(commandExecutor, name); this.commandExecutor = commandExecutor; this.id = id; } /**************************RedissonExpirable***********************/ RedissonExpirable(CommandAsyncExecutor connectionManager, String name) { super(connectionManager, name); } /**************************RedissonObject***********************/ public RedissonObject(CommandAsyncExecutor commandExecutor, String name) { this(commandExecutor.getConnectionManager().getCodec(), commandExecutor, name); } public RedissonObject(Codec codec, CommandAsyncExecutor commandExecutor, String name) { this.codec = codec; this.name = name; this.commandExecutor = commandExecutor; }
2 請求rediss獲取鎖 lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS)
/************************RedissonLock************************/ public boolean tryLock(long waitTime /*獲取鎖等待時間*/, long leaseTime /*鎖過期時間*/, TimeUnit unit) throws InterruptedException { long time = unit.toMillis(waitTime); long current = System.currentTimeMillis(); //當前線程ID 用戶拼接key final long threadId = Thread.currentThread().getId(); //嘗試獲取鎖 如果ttl==null則獲取到鎖 ttl!=null則表示鎖已存在,ttl為鎖剩余時間(毫秒) //tryAcquire最終實現方法tryAcquireAsync在下面代碼 Long ttl = tryAcquire(leaseTime, unit); //獲取鎖成功 if (ttl == null) { return true; } //time = time-上一次嘗試獲取鎖之后所用時間 time -= (System.currentTimeMillis() - current); //這里有兩種情況 //1 若waitTime為負數(不失效緩存), 則返回false //2 若waitTime為正數 經過第一次嘗試獲取鎖之后未獲取成功 但已經超過等待時長 返回false if (time <= 0) { return false; } current = System.currentTimeMillis(); //訂閱消息 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId); //阻塞等待一段時間(time) 等待訂閱結果 if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) { if (!subscribeFuture.cancel(false)) { subscribeFuture.addListener(new FutureListener<RedissonLockEntry>() { @Override public void operationComplete(Future<RedissonLockEntry> future) throws Exception { if (subscribeFuture.isSuccess()) { unsubscribe(subscribeFuture, threadId); } } }); } //嘗試獲取CountDownLatch鎖失敗時直接返回失敗 return false; } try { time -= (System.currentTimeMillis() - current); //超過waittime返回失敗 if (time <= 0) { return false; } //死循環 很簡單 就是在等待時長內重復獲取鎖 直到獲取成功或超時 while (true) { long currentTime = System.currentTimeMillis(); //嘗試獲取鎖 tryAcquire最終實現方法tryAcquireAsync在下面代碼 ttl = tryAcquire(leaseTime, unit); // 過期時間為null 則獲取成功 if (ttl == null) { return true; } time -= (System.currentTimeMillis() - currentTime); //依然判斷超時 if (time <= 0) { return false; } currentTime = System.currentTimeMillis(); //等待時間大於過期時間 那么就等已存在的過期之后再獲取 if (ttl >= 0 && ttl < time) { //阻塞時長ttl getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } //嘗試獲取鎖 else { //阻塞時長time getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS); } //再次計算time>0 time -= (System.currentTimeMillis() - currentTime); //若是小於等於0則為超時 直接返回false 否則進入下一次循環( while (true) ) if (time <= 0) { return false; } } } finally { //最終取消訂閱 訂閱代碼為 final RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId) unsubscribe(subscribeFuture, threadId); } } //嘗試獲取鎖(同步) private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) { //若過期時間為有限時間(leaseTime==-1為永不過期) if (leaseTime != -1) { return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } //以下為永不過期 //設置把永不過期時間改為30秒過期 RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG); //注冊監聽器 ttlRemainingFuture.addListener(new FutureListener<Long>() { //獲取鎖操作結束觸發下面操作 @Override public void operationComplete(Future<Long> future) throws Exception { //如果為獲取到鎖 不做任何事情 if (!future.isSuccess()) { return; } Long ttlRemaining = future.getNow(); // 獲取到了鎖 if (ttlRemaining == null) { //定期對鎖進行延時 達到永不過期目的 scheduleExpirationRenewal(threadId); } } }); return ttlRemainingFuture; } //嘗試獲取鎖(同步方式) //看下面代碼之前需要先了解Redis的Hash的數據結構, 下面腳本使用的就是Hash //String數據結構為 Map<Key, Value>, 通常根據Key取值 //Hash數據結構為Map<Key, Map<Field, Value>>, Field <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { //時間轉換 internalLockLeaseTime = unit.toMillis(leaseTime); //執行調用redis腳本 return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, //為了方便理解對下面數組進行解釋 KEYS[1]為Hash:設置的lockKey, ARGV[1]過期時間:leaseTime, ARGV[2]為Hash的Field:當前線程ID和當前redisson拼接 //如果key不存在 則設置當前key(KEYS[1]) field(ARGV[2]) value(1), 設置key(KEYS[1])過期時間ARGV[1] 返回空值(nil) "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //如果根據key和field能查詢到值, 則value在原來基礎上加1 //這里可能會有誤解: 上面已經判斷key已經存在, 因為這里是獨占鎖, //若根據此鎖不屬於當前線程(redis.call('hexists', KEYS[1], ARGV[2]) != 1), 則肯定被其他線程占有,獲取鎖失敗 "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + //返回當前key的剩余時間 "return redis.call('pttl', KEYS[1]);", //關鍵代碼 Collections.<Object>singletonList(getName()) /*KEYS*/, internalLockLeaseTime /*ARGV[1]*/, getLockName(threadId)/*ARGV[2]*/ ); }
額外拓展:
redisson使用大量的異步操作(基於netty),代碼比較難讀,下面針對訂閱lockKey的代碼畫出大致類結構圖 RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId)


