Redisson分布式鎖總結


1、分布式鎖目前可能存在的問題(基於redis客戶端jedis)
加鎖: set key value [expiration EX seconds|PX milliseconds] [NX|XX]
該加鎖方式是從Redis2.8之后便支持這種原子性加鎖方式,之前設置setnx和設置過期時間不是原子性的。

String ret = jedis.set(getKey(key), 
                       new CacheEntity<T>(serializable, version, System.currentTimeMillis()).getBytes(), 
                       "NX".getBytes(),
                       "EX".getBytes(), 
                       expireSecond);
return StringUtils.equals("OK", ret);

參數解釋如下:

  • EX second :設置key的過期時間為 second 秒
  • PX millisecond :設置key的過期時間為 millisecond 毫秒
  • NX :只在key不存在時,才對key進行設置操作
  • XX :只在key已經存在時,才對key進行設置操作

解鎖

ret = jedis.del(getKey(key));

使用

if (!lock.tryLockXxx(key)) {
  return ;
}
try {
  // TODO
} finally { lock.unlockXxx(key); }

基於jedis實現的分布式鎖缺點

  • 不支持重入
  • 加鎖時設置的value無意義
  • 鎖超時時間不能自動續期,所以不好取值
  • 鎖超時時間 > 業務執行時間,業務正常執行完成釋放鎖,沒問題,業務節點奔潰,尚未釋放鎖,會導致其他業務系統線程最多等待整個超時時間
  • 鎖超時時間 < 業務執行時間,鎖超時自動釋放,業務執行完成,再執行unlock,可能會錯誤的解鎖
  • 加鎖key設置在Master節點上,尚未同步到slave就宕機了,salve提升為新的master,沒有上一個鎖的信息,其他線程依然可以獲取同一個key的鎖

2、Redisson客戶端優勢

  • 支持 SSL
  • 線程安全的實現
  • Reactive Streams API
  • 異步 API
  • 異步連接池
  • Lua 腳本
  • 分布式Java對象
  • Object holder, Binary stream holder, Geospatial holder, BitSet, AtomicLong, AtomicDouble, PublishSubscribe, Bloom filter, HyperLogLog
  • 分布式Java集合
  • Map, Multimap, Set, List, SortedSet, ScoredSortedSet, LexSortedSet, Queue, Deque, Blocking Queue, Bounded Blocking Queue, Blocking Deque, Delayed Queue, Priority Queue, Priority Deque
  • 分布式鎖和同步器
  • Lock, FairLock, MultiLock, RedLock, ReadWriteLock, Semaphore, PermitExpirableSemaphore, CountDownLatch

Redisson的基本用法參考這篇博客《基於redission的分布式鎖

自動續期源碼:

// tryAcquireAsync -> scheduleExpirationRenewal -> renewExpiration
// 自動續期
private void renewExpiration() {
    ...

    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }

            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                if (e != null) {
                    log.error("Can't update lock " + getRawName() + " expiration", e);
                    EXPIRATION_RENEWAL_MAP.remove(getEntryName());
                    return;
                }

                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

    ...
}

真正執行續期的lua腳本

protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                          "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                          "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                          "return 1; " +
                          "end; " +
                          "return 0;",
                          Collections.singletonList(getRawName()),
                          internalLockLeaseTime, getLockName(threadId));
}

Netty中的時間輪算法 HashedWheelTimer

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM