1.1 互斥
在分布式高並發的條件下,需要保證,同一時刻只能有一個線程獲得鎖。
1.2 防止死鎖
在分布式高並發的條件下,比如有個線程獲得鎖的同時,還沒有來得及去釋放鎖,就因為系統故障或者其它原因使它無法執行釋放鎖的命令,導致其它線程都無法獲得鎖,造成死鎖。
所以分布式非常有必要設置鎖的有效時間
,確保系統出現故障后,在一定時間內能夠主動去釋放鎖,避免造成死鎖的情況。
1.3 性能
對於訪問量大的共享資源,需要考慮減少鎖等待的時間,避免導致大量線程阻塞。
所以在鎖的設計時,需要考慮兩點。
-
1、
鎖的顆粒度要盡量小
。比如你要通過鎖來減庫存,那這個鎖的名稱你可以設置成是商品的ID,而不是任取名稱。這樣這個鎖只對當前商品有效,鎖的顆粒度小。
-
2、鎖的范圍盡量要小`。比如只要鎖2行代碼就可以解決問題的,那就不要去鎖10行代碼了。
1.4 可重入
ReentrantLock是可重入鎖,那它的特點就是:同一個線程可以重復拿到同一個資源的鎖。重入鎖非常有利於資源的高效利用。
2:Redission的原理分析
2.1 加鎖機制
線程去獲取鎖,獲取成功: 執行lua腳本,保存數據到redis數據庫。
線程去獲取鎖,獲取失敗: 一直通過while循環嘗試獲取鎖,獲取成功后,執行lua腳本,保存數據到redis數據庫。
2.2 watch dog自動延期機制
在一個分布式環境下,假如一個線程獲得鎖后,突然服務器宕機了,那么這個時候在一定時間后這個鎖會自動釋放,你也可以設置鎖的有效時間(不設置默認30秒),這樣的目的主要是防止死鎖的發生。
2.3 為啥要用lua腳本
主要是如果你的業務邏輯復雜的話,通過封裝在lua腳本中發送給redis,而且redis是單線程的,這樣就保證這段復雜業務邏輯執行的原子性。
在分布式鎖中,加鎖的操作需要多條命令,使用lua腳本保證了原子性。
2.4 可重入加鎖機制
Redisson可以實現可重入加鎖機制基於以下兩點進行實現:
1、Redis存儲鎖的數據類型是 Hash類型
2、Hash數據類型的key值包含了當前線程信息。
使用的是數據類型是Hash類型,Hash類型相當於我們java的 <key,<key1,value>>
類型,這里key是指 加鎖的key。例如:key1值為078e44a3-5f95-4e24-b6aa-80684655a15a:45
它的組成是:
guid + 當前線程的ID。后面的value就是重入的次數。
3:Redis分布式鎖的缺點
(1):如果使用的是單Master節點,那么可能會因為主備節點切換時候,鎖數據沒有同步完成就出現一次加鎖,可能出現問題。
4:代碼分析
4.1 RLock接口
其中繼承的接口Lock,是JUC中的接口,具有Lock中的能力。同時它還有很多新特性:強制鎖釋放,帶有效期的鎖。
public interface RLock extends Lock, RLockAsync
public interface RRLock {
//----------------------Lock接口方法-----------------------
/**
* 加鎖 鎖的有效期默認30秒
*/
void lock();
/**
* tryLock()方法是有返回值的,它表示用來嘗試獲取鎖,如果獲取成功,則返回true,如果獲取失敗(即鎖已被其他線程獲取),則返回false .
*/
boolean tryLock();
/**
* tryLock(long time, TimeUnit unit)方法和tryLock()方法是類似的,只不過區別在於這個方法在拿不到鎖時會等待一定的時間,
* 在時間期限之內如果還拿不到鎖,就返回false。如果如果一開始拿到鎖或者在等待期間內拿到了鎖,則返回true。
*
* @param time 等待時間
* @param unit 時間單位 小時、分、秒、毫秒等
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
/**
* 解鎖
*/
void unlock();
/**
* 中斷鎖 表示該鎖可以被中斷 假如A和B同時調這個方法,A獲取鎖,B為獲取鎖,那么B線程可以通過
* Thread.currentThread().interrupt(); 方法真正中斷該線程
*/
void lockInterruptibly();
//----------------------RLock接口方法-----------------------
/**
* 加鎖 上面是默認30秒這里可以手動設置鎖的有效時間
*
* @param leaseTime 鎖有效時間
* @param unit 時間單位 小時、分、秒、毫秒等
*/
void lock(long leaseTime, TimeUnit unit);
/**
* 這里比上面多一個參數,多添加一個鎖的有效時間
*
* @param waitTime 等待時間
* @param leaseTime 鎖有效時間
* @param unit 時間單位 小時、分、秒、毫秒等
*/
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
/**
* 檢驗該鎖是否被線程使用,如果被使用返回True
*/
boolean isLocked();
/**
* 檢查當前線程是否獲得此鎖(這個和上面的區別就是該方法可以判斷是否當前線程獲得此鎖,而不是此鎖是否被線程占有)
* 這個比上面那個實用
*/
boolean isHeldByCurrentThread();
/**
* 中斷鎖 和上面中斷鎖差不多,只是這里如果獲得鎖成功,添加鎖的有效時間
* @param leaseTime 鎖有效時間
* @param unit 時間單位 小時、分、秒、毫秒等
*/
void lockInterruptibly(long leaseTime, TimeUnit unit);
}
4.2:RLock實現類RedissonLock
public class RedissonLock extends RedissonExpirable implements RLock
4.2.1 void lock()方法
-
1:RedissionLock的Lock方法
@Override
public void lock() {
try {
lock(-1, null, false);
} catch (InterruptedException e) {
throw new IllegalStateException();
}
}
-
2:Lock帶參數
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
long threadId = Thread.currentThread().getId();
//1:開始進行嘗試獲取鎖
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
//2:訂閱鎖的釋放信息
RFuture<RedissonLockEntry> future = subscribe(threadId);
//3:等待鎖的釋放
if (interruptibly) {
commandExecutor.syncSubscriptionInterrupted(future);
} else {
commandExecutor.syncSubscription(future);
}
try {
while (true) {
//4:循環多次嘗試獲取鎖
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
//5:獲取鎖成功,返回
if (ttl == null) {
break;
}
// waiting for message
//6:休眠等待鎖被釋放的信號,釋放被喚醒
if (ttl >= 0) {
try {
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
if (interruptibly) {
throw e;
}
future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
}
} else {
if (interruptibly) {
future.getNow().getLatch().acquire();
} else {
future.getNow().getLatch().acquireUninterruptibly();
}
}
}
} finally {
//7:退訂鎖的信息
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
-
3:tryAcquire
異步嘗試進行加鎖,嘗試加鎖的時候leaseTime為-1。通常如果客戶端沒有加鎖成功,則會進行阻塞,leaseTime為鎖釋放的時間。
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
//使用get方法,獲取RFuture數據,沒有返回則阻塞
return get(tryAcquireAsync(leaseTime, unit, threadId));
}private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
//進行加鎖,返回RFuture
if (leaseTime != -1) {
//異步進行獲取鎖的操作
return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
//進行看門狗的處理,見4.2.3
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}生成lua腳本:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
上述lua腳本:
總結:
-
1:加鎖,使用Lock沒有攜帶過期時間,但是會使用一個默認值: lockWatchdogTimeout = 30 * 1000;
-
2:和ReentranLock一樣,在進行獲取鎖的時候,首先會先嘗試一次獲取鎖的操作,如果獲取鎖失敗使用信號量的方式,訂閱鎖的釋放信息,有了釋放鎖的信息,則進行嘗試加鎖,依此循環。
-
3:根據鎖釋放的邏輯,鎖釋放的時候會發布鎖解除的消息,應該在2中訂閱對應,在釋放鎖的時候釋放鎖的信息。
4.2.2 解鎖
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
-
1:unlockAsync方法
@Override
public RFuture<Void> unlockAsync(long threadId) {
RPromise<Void> result = new RedissonPromise<Void>();
//核心的解鎖流程
RFuture<Boolean> future = unlockInnerAsync(threadId);
//后取是解除看門狗進程
future.onComplete((opStatus, e) -> {
if (e != null) {
cancelExpirationRenewal(threadId);
result.tryFailure(e);
return;
}
if (opStatus == null) {
IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
+ id + " thread-id: " + threadId);
result.tryFailure(cause);
return;
}
cancelExpirationRenewal(threadId);
result.trySuccess(null);
});
return result;
}
-
2:解鎖unlockInnerAsync
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
/**
*(1):判斷key是否存在,不存在即釋放成功;
*(2):減少鎖定值1,減去后如果小於等於0,則釋放鎖,並且進行事件通知鎖釋放。
*/
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
4.2.3 看門狗實現
在方法:tryAcquireAsync中嘗試加鎖之后,會啟動定時任務進行看門狗的進程
entryName:鎖的唯一標記
entry:線程ID信息的包裝類
private void scheduleExpirationRenewal(long threadId) {
ExpirationEntry entry = new ExpirationEntry();
//在EXPIRATION_RENEWAL_MAP中添加entryNme和entry的數據
ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
if (oldEntry != null) {
oldEntry.addThreadId(threadId);
} else {
entry.addThreadId(threadId);
//啟動一個開門狗的線程
renewExpiration();
}
}
private void renewExpiration() {
ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ee == null) {
return;
}
//在internalLockLeaseTime / 3時間后執行
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
//看門狗的核心方法
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
// reschedule itself
//命令執行成功之后,再循環調用
renewExpiration();
}
});
}
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
//使用lua腳本進行鎖的延長
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.<Object>singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
5:RedLock
RedLock是基於redis實現的分布式鎖,它能夠保證以下特性:
- 互斥性:在任何時候,只能有一個客戶端能夠持有鎖;避免死鎖:
- 當客戶端拿到鎖后,即使發生了網絡分區或者客戶端宕機,也不會發生死鎖;(利用key的存活時間)
- 容錯性:只要多數節點的redis實例正常運行,就能夠對外提供服務,加鎖或者釋放鎖; RedLock算法思想,意思是不能只在一個redis實例上創建鎖,應該是在多個redis實例上創建鎖,n / 2 + 1,必須在大多數redis節點上都成功創建鎖,才能算這個整體的RedLock加鎖成功,避免說僅僅在一個redis實例上加鎖而帶來的問題。
Redisson中有一個MultiLock
的概念,可以將多個鎖合並為一個大鎖,對一個大鎖進行統一的申請加鎖以及釋放鎖。而Redisson中實現RedLock就是基於MultiLock