前言
官網的英文介紹大概如下:
Starting with version 4.0, the RedisLockRegistry is available. Certain components (for example aggregator and resequencer) use a lock obtained from a LockRegistry instance to ensure that only one thread is manipulating a group at a time. The DefaultLockRegistry performs this function within a single component; you can now configure an external lock registry on these components. When used with a shared MessageGroupStore, the RedisLockRegistry can be use to provide this functionality across multiple application instances, such that only one instance can manipulate the group at a time.
When a lock is released by a local thread, another local thread will generally be able to acquire the lock immediately. If a lock is released by a thread using a different registry instance, it can take up to 100ms to acquire the lock.
To avoid "hung" locks (when a server fails), the locks in this registry are expired after a default 60 seconds, but this can be configured on the registry. Locks are normally held for a much smaller time.
上述大概意思是RedisLockRegistry可以確保在分布式環境中,只有一個thread在執行,也就是實現了分布式鎖,當一個本地線程釋放了鎖,其他本地現場會立即去搶占鎖,如果鎖被占用了,那么會進行重試機制,100毫秒進行重試一次。同時也避免了"hung" locks 當服務器fails的時候。同時也給鎖設置了默認60秒的過期時間
如何獲取鎖

詳細流程如上圖所示,這里主要核心業務是這樣,首先Lock是java.util.concurrent.locks中的鎖,也就是本地鎖。然后自己用RedisLock實現了Lock接口而已,但是實際上RedisLock也使用了本地鎖。主要是通過redis鎖+本地鎖雙重鎖的方式實現的一個比較好的鎖。針對redis鎖來說只要能獲取到鎖,那么就算是成功的。如果獲取不到鎖就等待100毫秒繼續重試,如果獲取到鎖那么就采用本地鎖鎖住本地的線程。通過兩種方式很好的去實現了一個完善的分布式鎖機制。
下面代碼主要是獲取鎖的一個流程,先從本地鎖里面獲取,如果獲取到了那么和redis里面存放的RedisLock鎖做對比,判斷是否是同一個對象,如果不是那么就刪除本地鎖然后重新創建一個鎖返回
@Override
public Lock obtain(Object lockKey) {
Assert.isInstanceOf(String.class, lockKey);
//try to find the lock within hard references
//從本地強引用里面獲取鎖,
RedisLock lock = findLock(this.hardThreadLocks.get(), lockKey);
/*
* If the lock is locked, check that it matches what's in the store.
* If it doesn't, the lock must have expired.
*/
//這里主要判斷了這個鎖是否是鎖住的,如果不是的那么該鎖已經過期了
//如果強引用里面有這個鎖,並且lock.thread!=null,說明這個鎖沒有被占用
if (lock != null && lock.thread != null) {
//從redis獲取鎖,若如果redis鎖為空或者跟當前強引用的鎖不一致,可以確定兩個問題
//1.redis里面的鎖和本地的鎖不是一個了
//2.redis里面沒有鎖
RedisLock lockInStore = this.redisTemplate.boundValueOps(this.registryKey + ":" + lockKey).get();
if (lockInStore == null || !lock.equals(lockInStore)) {
//刪除強引用里面鎖
getHardThreadLocks().remove(lock);
lock = null;
}
}
//如果鎖==null
if (lock == null) {
//try to find the lock within weak references
//嘗試線從弱引用里面去找鎖
lock = findLock(this.weakThreadLocks.get(), lockKey);
//如果弱引用鎖==null 那么新建一個鎖
if (lock == null) {
lock = new RedisLock((String) lockKey);
//判斷是否用弱引用,如果用那么就加入到弱引用里面
if (this.useWeakReferences) {
getWeakThreadLocks().add(lock);
}
}
}
return lock;
}
上面獲取到的是RedisLock,RedisLock是實現java原生Lock接口,並重寫了lock()方法。首先從localRegistry中獲取到鎖,這里的鎖是java開發包里面的ReentrantLock。首先把本地先鎖住,然后再去遠程obtainLock。每次sleep() 100毫秒直到獲取到遠程鎖為止,代碼如下所示:
@Override
public void lock() {
//這里采用java開發包里面的ReentrantLock 進行多線程的加鎖,單機多線程的情況下解決並發的問題
Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
localLock.lock();
while (true) {
try {
while (!this.obtainLock()) {
Thread.sleep(100); //NOSONAR
}
break;
}
catch (InterruptedException e) {
/*
* This method must be uninterruptible so catch and ignore
* interrupts and only break out of the while loop when
* we get the lock.
*/
}
catch (Exception e) {
localLock.unlock();
rethrowAsLockException(e);
}
}
}
核心遠程鎖還是在RedisLock中,這里采用了redis事務+watch的方式,watch和事務都是redis里面自帶的。使用watch時候如果key的值發生了任何變化。那么exec()將不會執行,那么如下代碼返回的success就是false。從而來實現redis鎖的功能
private boolean obtainLock() {
//判斷創建這個類的線程和當前是否是一個,如果是就直接獲取鎖
Thread currentThread = Thread.currentThread();
if (currentThread.equals(this.thread)) {
this.reLock++;
return true;
}
//把當前鎖存到集合種
toHardThreadStorage(this);
/*
* Set these now so they will be persisted if successful.
*/
this.lockedAt = System.currentTimeMillis();
this.threadName = currentThread.getName();
Boolean success = false;
try {
success = RedisLockRegistry.this.redisTemplate.execute(new SessionCallback<Boolean>() {
@SuppressWarnings({"unchecked", "rawtypes"})
@Override
public Boolean execute(RedisOperations ops) throws DataAccessException {
String key = constructLockKey();
//監控key如果該key被改變了 那么該事務是不能被實現的會進行回滾
ops.watch(key); //monitor key
//如果key存在了就停止監控,如果key已經存在了 那么肯定是被別人占用了
if (ops.opsForValue().get(key) != null) {
ops.unwatch(); //key already exists, stop monitoring
return false;
}
ops.multi(); //transaction start
//設置一個值並加上過期時間 m默認是一分鍾左右的時間
//set the value and expire
//把鎖放入到redis中
ops.opsForValue()
.set(key, RedisLock.this, RedisLockRegistry.this.expireAfter, TimeUnit.MILLISECONDS);
//exec will contain all operations result or null - if execution has been aborted due to 'watch'
return ops.exec() != null;
}
});
}
finally {
//如果不成功那么把當前過期時間和鎖的名字設置成null
if (!success) {
this.lockedAt = 0;
this.threadName = null;
toWeakThreadStorage(this);
}
else {
//如果成功把當前鎖的thread名稱設置成currentThread
this.thread = currentThread;
if (logger.isDebugEnabled()) {
logger.debug("New lock; " + this.toString());
}
}
}
return success;
}
上面是整個加鎖的流程,基本流程比較簡單,看完加鎖應該自己都能解鎖,無非就是去除redis鎖和本地的鎖而已。
@Override
public void unlock() {
//判斷當前運行的線程和鎖的線程做對比,如果兩個線程不一樣那么拋出異常
if (!Thread.currentThread().equals(this.thread)) {
if (this.thread == null) {
throw new IllegalStateException("Lock is not locked; " + this.toString());
}
throw new IllegalStateException("Lock is owned by " + this.thread.getName() + "; " + this.toString());
}
try {
//如果reLock--小於=0的話就刪除redis里面的鎖
if (this.reLock-- <= 0) {
try {
this.assertLockInRedisIsUnchanged();
RedisLockRegistry.this.redisTemplate.delete(constructLockKey());
if (logger.isDebugEnabled()) {
logger.debug("Released lock; " + this.toString());
}
}
finally {
this.thread = null;
this.reLock = 0;
toWeakThreadStorage(this);
}
}
}
finally {
//拿到本地鎖,進行解鎖
Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
localLock.unlock();
}
}
tryLock在原有的加鎖上面增加了一個超時機制,主要是先通過本地的超時機制
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//拿到本地鎖
Lock localLock = RedisLockRegistry.this.localRegistry.obtain(this.lockKey);
//先本地鎖進行tryLock
if (!localLock.tryLock(time, unit)) {
return false;
}
try {
long expire = System.currentTimeMillis() + TimeUnit.MILLISECONDS.convert(time, unit);
boolean acquired;
//這里添加了超時機制,跟之前的無限等待做了一個區分
while (!(acquired = obtainLock()) && System.currentTimeMillis() < expire) { //NOSONAR
Thread.sleep(100); //NOSONAR
}
//超時后沒有獲取到鎖,那么就把本地鎖進行解鎖
if (!acquired) {
localLock.unlock();
}
return acquired;
}
catch (Exception e) {
localLock.unlock();
rethrowAsLockException(e);
}
return false;
}
