redission 分布式鎖


概述

分布式系統有一個著名的理論CAP,指在一個分布式系統中,最多只能同時滿足一致性(Consistency)、可用性(Availability)和分區容錯性(Partition tolerance)這三項中的兩項。所以在設計系統時,往往需要權衡,在CAP中作選擇。當然,這個理論也並不一定完美,不同系統對CAP的要求級別不一樣,選擇需要考慮方方面面。

在微服務系統中,一個請求存在多級跨服務調用,往往需要犧牲強一致性老保證系統高可用,比如通過分布式事務,異步消息等手段完成。但還是有的場景,需要阻塞所有節點的所有線程,對共享資源的訪問。比如並發時“超賣”和“余額減為負數”等情況。

本地鎖可以通過語言本身支持,要實現分布式鎖,就必須依賴中間件,數據庫、redis、zookeeper等。

分布式鎖特性

不管使用什么中間件,有幾點是實現分布式鎖必須要考慮到的。

  1. 互斥:互斥好像是必須的,否則怎么叫鎖。

  2. 死鎖: 如果一個線程獲得鎖,然后掛了,並沒有釋放鎖,致使其他節點(線程)永遠無法獲取鎖,這就是死鎖。分布式鎖必須做到避免死鎖。

  3. 性能: 高並發分布式系統中,線程互斥等待會成為性能瓶頸,需要好的中間件和實現來保證性能。

  4. 鎖特性:考慮到復雜的場景,分布式鎖不能只是加鎖,然后一直等待。最好實現如Java Lock的一些功能如:鎖判斷,超時設置,可重入性等。

Redis實現之Redisson原理

redission實現了JDK中的Lock接口,所以使用方式一樣,只是Redssion的鎖是分布式的。如下:

RLock lock = redisson.getLock("className");
lock.lock();
try {
// do sth.
} finally {
lock.unlock();
}

好,Lock主要實現是RedissionLock。

先來看常用的Lock方法實現。

@Override
public void lock() {
try {
lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
@Override
public void lockInterruptibly() throws InterruptedException {
lockInterruptibly(-1, null);
}

再看lockInterruptibly方法:

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
// 獲取鎖
Long ttl = tryAcquire(leaseTime, unit, threadId);
if (ttl == null) { // 獲取成功
return;
}
 
// 異步訂閱redis chennel
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future); // 阻塞獲取訂閱結果
 
try {
while (true) {// 循環判斷知道獲取鎖
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
 
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);// 取消訂閱
}
}

總結lockInterruptibly:獲取鎖,不成功則訂閱釋放鎖的消息,獲得消息前阻塞。得到釋放通知后再去循環獲取鎖。

下面重點看看如何獲取鎖:Long ttl = tryAcquire(leaseTime, unit, threadId)

private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(leaseTime, unit, threadId));// 通過異步獲取鎖,但get(future)實現同步
}
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, final long threadId) {
if (leaseTime != -1) { //1 如果設置了超時時間,直接調用 tryLockInnerAsync
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//2 如果leaseTime==-1,則默認超時時間為30s
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(LOCK_EXPIRATION_INTERVAL_SECONDS, TimeUnit.SECONDS, threadId, RedisCommands.EVAL_LONG);
//3 監聽Future,獲取Future返回值ttlRemaining(剩余超時時間),獲取鎖成功,但是ttlRemaining,則刷新過期時間
ttlRemainingFuture.addListener(new FutureListener<Long>() {
@Override
public void operationComplete(Future<Long> future) throws Exception {
if (!future.isSuccess()) {
return;
}
 
Long ttlRemaining = future.getNow();
// lock acquired
if (ttlRemaining == null) {
scheduleExpirationRenewal(threadId);
}
}
});
return ttlRemainingFuture;
}

已經在注釋中解釋了,需要注意的是,此處用到了Netty的Future-listen模型,可以看看我的另一篇對Future的簡單講解:給Future一個Promise。

下面就是最重要的redis獲取鎖的方法tryLockInnerAsync:

<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(
getName(),
LongCodec.INSTANCE,
command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

這個方法主要就是調用redis執行eval lua,為什么使用eval,因為redis對lua腳本執行具有原子性。把這個方法翻譯一下:

-- 1. 沒被鎖{key不存在}
eval "return redis.call('exists', KEYS[1])" 1 myLock
-- (1) 設置Lock為key,uuid:threadId為filed, filed值為1
eval "return redis.call('hset', KEYS[1], ARGV[2], 1)" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- (2) 設置key過期時間{防止獲取鎖后線程掛掉導致死鎖}
eval "return redis.call('pexpire', KEYS[1], ARGV[1])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
 
-- 2. 已經被同線程獲得鎖{key存在並且field存在}
eval "return redis.call('hexists', KEYS[1], ARGV[2])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- (1) 可重入,但filed字段+1
eval "return redis.call('hincrby', KEYS[1], ARGV[2],1)" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- (2) 刷新過去時間
eval "return redis.call('pexpire', KEYS[1], ARGV[1])" 1 myLock 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
 
-- 3. 已經被其他線程鎖住{key存在,但是field不存在}:以毫秒為單位返回 key 的剩余超時時間
eval "return redis.call('pttl', KEYS[1])" 1 myLock

這就是核心獲取鎖的方式,下面直接釋放鎖方法unlockInnerAsync

-- 1. key不存在
eval "return redis.call('exists', KEYS[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- (1) 發送釋放鎖的消息,返回1,釋放成功
eval "return redis.call('publish', KEYS[2], ARGV[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
 
-- 2. key存在,但field不存在,說明自己不是鎖持有者,無權釋放,直接return nil
eval "return redis.call('hexists', KEYS[1], ARGV[3])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
eval "return nil"
 
-- 3. filed存在,說明是本線程在鎖,但有可能其他地方重入鎖,不能直接釋放,應該-1
eval "return redis.call('hincrby', KEYS[1], ARGV[3],-1)" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
 
-- 4. 如果減1后大於0,說明還有其他重入鎖,刷新過期時間,返回0。
eval "return redis.call('pexpire', KEYS[1], ARGV[2])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
 
-- 5. 如果不大於0,說明最后一把鎖,需要釋放
-- 刪除key
eval "return redis.call('del', KEYS[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- 發釋放消息
eval "return redis.call('publish', KEYS[2], ARGV[1])" 2 myLock redisson_lock__channel_lock 0 3000 3d7b5418-a86d-48c5-ae15-7fe13ef0034c:110
-- 返回1,釋放成功

從釋放鎖代碼中看到,刪除key后會發送消息,所以上文提到獲取鎖失敗后,阻塞訂閱此消息。

另外,上文提到刷新過期時間方法scheduleExpirationRenewal,指線程獲取鎖后需要不斷刷新失效時間,避免未執行完鎖就失效。這個方法的實現原理也類似,只是使用了Netty的TimerTask,每到過期時間1/3就去重新刷一次,如果key不存在則停止刷新。Timer實現大概如下:

private static void nettyTimer() {
final int expireTime = 6;
EventExecutorGroup group = new DefaultEventExecutorGroup(1);
final Timer timer = new HashedWheelTimer();
timer.newTimeout(timerTask -> {
Future<Boolean> future = group.submit(() -> {
System.out.println("刷新key的失效時間為"+expireTime +"秒");
return false;// 但key不存在時,返回true
});
future.addListener(future1 -> {
if (!future.getNow()) {
nettyTimer();
}
});
}, expireTime/3, TimeUnit.SECONDS);
}

 


參考列表:

  • 一分鍾實現分布式鎖

我這里准備了一些

【Java核心技術資料】

【JAVA核心總結】

【524頁中高級XXXX】

【《JavaGuide面試突擊》v3.0肝出來了!】

【Java高級筆試寶典覆蓋近3年Java筆試中98%高頻知識點吊打100家大廠面試官】

【Java大廠面試題】

【阿里架構師花近十年時間整理出來的Java核心知識pdf(Java崗)】

【524頁《Java中高級程序員必備核心知識》總結,令人猶如醍醐灌頂】

等一系列的Java架構資料 需要的話掃一掃免費獲取 無套路


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM