Redis分布式鎖和RedissionLock可重入分布式鎖源碼解讀
本文主要講三個部分
1,分布式鎖的基本特性
2,設計一個可用的redis分布式鎖及會遇到的重要問題和解決辦法
3,RedissionLock的lock和unlock的源碼分析
分布式鎖
在分布式模式下,對一份臨界資源需要跨主機跨進程跨線程互斥訪問的時候,需要用分布式鎖,來保證多進程有序操作
分布式特點
1,互斥性:只能有一個線程擁有該鎖
2,鎖超時避免死鎖:當該線程發生異常,能讓其他線程獲取
3,容錯性,高可用
4,高性能
5,可重入:即當前獲取該鎖的線程,可以繼續獲取執行lock獲取該鎖,每次lock可重入計數+1,unlock可重入計數-1,直到這個值為0,視為徹底釋放該鎖,參照JUC下ReentrantLock
6,具備阻塞和非阻塞性:當獲取不到的線程,能夠被及時喚醒
分布式類型:
1,基於數據庫的悲觀鎖:X鎖
2,基於數據庫的樂觀鎖:基於版本號
3,基於redis的分布式鎖
4,基於zookeeper的分布式鎖
注:這篇文章主要講redis分布式鎖,以及具體實現RedissionLock可重入分布式鎖源碼解讀
可重入分布式鎖的設計
1,要保證互斥性和可重入性:需要 記錄獲取該鎖的線程信息,例如線程id,以及記錄可重入計數(即lock的次數)
2,鎖超時避免死鎖:需要對redis key設置超時淘汰,但是多長時間淘汰,是個問題,后面討論
3,容錯性,高可用,高性能:基於redis集群,則暫時不用考慮
4,獲取不到鎖的阻塞線程,需要被及時喚醒:可以基於redis發布訂閱,用來通知等待鎖的線程
如上設計需要解決的兩個問題
1,用記錄獲取鎖的線程id,來實現互斥以及判斷可重入,但是在linux下線程id,是不保證唯一的,更何況還要跨主機進程,這個也是需要好好思考的問題
2,避免死鎖,需要設置鎖超時,那設置多長時間合適?以及設置超時並獲取鎖之后,在java語言下,還存在因為gc,工作線程會暫定運行,即會存在STW (stop the world)問題導致多線程同時獲取鎖的異常情況的解決辦法
java gc STW (stop the word)導致的鎖過期問題
1,工作線程1,獲取鎖,並設置了超時淘汰時長
2,jvm gc垃圾回收時,會暫停工作線程,即STW
3,當工作線程1恢復工作的時候,由於STW的時長稍長,可能鎖已經超時淘汰了,但是該線程還不知道,此時工作線程2去獲取,也是能獲取到的,導致出現多個線程獲取同一個鎖的異常問題,如下圖所示
大概的解決方案,有:
1: 模擬CAS樂觀鎖的方式,增加版本號,如下圖
2: watch dog自動延期機制,在后面介紹RedissionLock時會介紹
注意:單機版的watch dog 並不能解決 STW的過期問題, 需要分布式版本的 watch dog, 獨立的看門狗服務。因為單機版,仍然受gc STW影響。這個問題特別容易被忽略,盡管gc STW時間通常不會太長,至少理論上需要考慮。 鎖刪除之后, 取消看門狗服務的對應的key記錄, 當然,這就使得系統變得復雜, 還要保證看門狗服務的高並發、高可用、數據一致性的問題。
RedissionLock 大致框架邏輯圖
鎖實現的主要數據結構
1,基於hash數據結構,hash key=lockName,保存了具有唯一性的threadId(例如uuid:threadId結構)和可重入計數,該hash下有且只有一個子元素,hincrby lockName uniqueThreadId,1
2,對lockName設置超時淘汰,pexpire lockName expireTime ,一般30秒超時淘汰,或者自己unlock及時釋放
3,基於發布訂閱實現watch dog:pulish redisson_lock__channel:{lockName} LockPubSub.UNLOCK_MESSAGE
RedissionLock.lock
1,鎖不存在,則設置鎖,線程id,和超時淘汰
2,鎖存在,可重入,則可重入計數增1
3,獲取了鎖,則添加watch dog定時線程定時刷新超時淘汰時間,默認每10秒執行一次
4,沒獲取到鎖,則訂閱鎖釋放通知,等待通知
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {//是當前線程獲取到鎖了,並且會重新設置超時淘汰時間和自增可重入次數
return;
}
//加鎖操作失敗,訂閱消息,利用 redis 的 pubsub 提供一個通知機制來減少不斷的重試,避免發生活鎖。
//活鎖:是指線程 1 可以使用資源,但它很禮貌,讓其他線程先使用資源,線程 2 也可以使用資源,但它很紳士,也讓其他線程先使用資源。這樣你讓我,我讓你,最后兩個線程都無法使用資源
//在lock中執行訂閱,unlock-> unlockInnerAsync 則發布publish redisson_lock__channel:{lockName} LockPubSub.UNLOCK_MESSAGE=0 通知等待的線程去獲取鎖
RFuture<RedissonLockEntry> future = subscribe(threadId);
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
ttl = tryAcquire(-1, leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
//當前線程沒有拿到鎖,先在上面訂閱了channel key=redisson_lock__channel:{lockName}
//在如下則,等待在拿到鎖的線程unlock之后可重入計數=0時,
//會pulish redisson_lock__channel:{lockName} LockPubSub.UNLOCK_MESSAGE 一個信息,則喚醒等待獲取鎖的線程去tryAcquire
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
unsubscribe(future, threadId);
}
}
private Long tryAcquire(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
return get(tryAcquireAsync(waitTime, leaseTime, unit, threadId));
}
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
RFuture<Long> ttlRemainingFuture;
if (leaseTime != -1) {
ttlRemainingFuture = tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
} else {
ttlRemainingFuture = tryLockInnerAsync(waitTime, internalLockLeaseTime,//key超時淘汰 30秒=30*1000
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
}
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
if (leaseTime != -1) {
internalLockLeaseTime = unit.toMillis(leaseTime);
} else {
scheduleExpirationRenewal(threadId);//添加watch dog
}
}
});
return ttlRemainingFuture;
}
加鎖lua腳本
1,邏輯調用鏈:lock->tryAcquire->tryLockInnerAsync
/**
* 1,如果key(lockName)不存在,(key=getLock中參數name),則 hincrby key hash-key,hash-value
* 2,設置key毫秒超時淘汰
*
* 3,如果key存在,並且包含元素hash-key,執行如上1和2,hash-key=commandExecutor.getConnectionManager().getId()+":"+threadId
*
* 4,如果key存在,並且不包含元素hash-key,則獲取超時淘汰時間長
* @param waitTime
* @param leaseTime
* @param unit
* @param threadId
* @param command
* @param <T>
* @return
*/
<T> RFuture<T> tryLockInnerAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.singletonList(getRawName()), unit.toMillis(leaseTime), getLockName(threadId));
}
lua腳本保證原子性,lua字段解釋
1,KEYS[1]:表示你加鎖的那個key,比如說RLock lock = redisson.getLock(“myLock”);這里你自己設置了加鎖的那個鎖key就是“myLock”。
2,ARGV[1]:表示鎖的有效期,默認30s
3,ARGV[2]:表示表示加鎖的線程ID,類似於:8743c9c0-0795-4907-87fd-6c719a6b4586:1,大致是uuid:threadId,用來避免threadId重復,又可以實現互斥和可重入判斷
RedissionLock.unlock
1,可重入計數-1,當可重入計數=0時,則發布一條消息用於通知等待的線程
2,刪除watch dog
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
}
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<>();
RFuture<Boolean> future = unlockInnerAsync(threadId);
future.onComplete((opStatus, e) -> {
cancelExpirationRenewal(threadId);//刪除watch dog
if (e != null) {
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
result.trySuccess(null);
});
return result;
}
lua解鎖腳本
/**
* 1,如果不存在key 和hash-key,則直接退出
* 2,存在key和hash-key,則可重入計數減一
* 3,當可重入計數還大於0,則更新一下淘汰時長,還需要繼續被同一個線程解鎖,因為同一個線程下可重入鎖被多次lock
* 4,如果可重入計數等於0,則可重入可以徹底釋放了,則刪除這個key,並發布訂閱,讓之前等待該鎖的線程進入
* @param threadId
* @return
*/
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " + ////在lock中有訂閱這個可以,unlock 則publish redisson_lock__channel:{lockName} LockPubSub.UNLOCK_MESSAGE=0
"return 1; " +
"end; " +
"return nil;",
Arrays.asList(getRawName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
watch dog 邏輯
1,watchDog添加邏輯:lock->tryAcquire->tryAcquireAsync->獲取鎖之后執行scheduleExpirationRenewal(創建一個定時任務,並把這個定時任務加入EXPIRATION_RENEWAL_MAP)->{lua腳本:存在,則重試淘汰時長,默認30秒}
2,創建HashedWheelTimer.HashedWheelTimeout,internalLockLeaseTime / 3間隔循環執行,默認是30/3=10秒,去刷新超時淘汰時間
3,watchDog刪除邏輯:unlock->unlockAsync-> cancelExpirationRenewal(threadId){EXPIRATION_RENEWAL_MAP.remove}
注意:單機版的watch dog 並不能解決 STW的過期問題, 需要分布式版本的 watch dog, 獨立的看門狗服務。因為單機版,仍然受gc STW影響。這個問題特別容易被忽略,盡管gc STW時間通常不會太長,至少理論上需要考慮
protected void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getRawName() + " expiration", e);
EXPIRATION_RENEWAL_MAP.remove(getEntryName());
return;
}
if (res) {
// reschedule itself
renewExpiration();
} else {
cancelExpirationRenewal(null);
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getRawName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getRawName()),
internalLockLeaseTime, getLockName(threadId));
}
設計結構上最大的問題:
1,異步復制,master和slave不一致,master宕機,slave則沒有該鎖記錄,導致多端獲取該鎖:就是如果你對某個redis master實例,寫入了myLock這種鎖key的value,此時會 異步復制 給對應的master slave實例。但是這個過程中一旦發生redis master宕機,主備切換,redis slave變為了redis master。接着就會導致,客戶端2來嘗試加鎖的時候,在新的redis master上完成了加鎖,而客戶端1也以為自己成功加了鎖。此時就會導致多個客戶端對一個分布式鎖完成了加鎖。這時系統在業務上一定會出現問題,導致臟數據的產生。所以這個就是redis cluster,或者是redis master-slave架構的主從異步復制導致的redis分布式鎖的最大缺陷:在redis master實例宕機的時候,可能導致多個客戶端同時完成加鎖
2,需要設計分布式的watch dog機制以避免GC STW問題,這個問題,在redission中,目前待后面閱讀補充