關於Redis分布式鎖網上有很多優秀的博文,這篇文章僅作為我這段時間遇到的新問題的記錄。
1.什么是分布式鎖:
在單機部署的情況下,為了保證數據的一致性,不出現臟數據等,就需要使用synchronized關鍵字、semaphore、ReentrantLock或者我們可以基於AQS定制鎖。鎖是在多線程間共享;在分布式部署情況下,鎖是在多進程間共享的;所以為了保證鎖在多進程之間的唯一性,就需要實現鎖在多進程之間的共享。
2.分布式鎖的特性:
2.1要保證某個時刻中只有一個服務的一個方法獲取到這個鎖
2.2要保證是可重入鎖(避免死鎖)
2.3要保證鎖的獲取和釋放的高可用。
3.分布式鎖考慮的要點:
3.1需要在何時釋放鎖(finally)
3.2鎖超時設置
3.3鎖刷新設置(timeOut)
3.4如果鎖超時了,為了避免誤刪了其他其他線程的鎖,可以將當前線程的id存入redis中,當前線程釋放鎖的時候,需要判斷存入redis的值是否為當前線程的id
3.5可重入
4.Redis分布式鎖:
RedisLockRegistry是spring-integration-redis中提供Redis的實現類;主要通過redis鎖+本地鎖兩個鎖方式實現。
4.1在pomx.xml文件中導入spring-integration-redis的依賴:
<!-- 分布式鎖支持 start--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <!-- 分布式鎖支持 end-->
4.2 RedisLockRegistry類主要內部結構如圖:
RedisLockRegistry類的靜態String的常量OBTAIN_LOCK_SCRIPT是RedisLockRegistry類的一個上鎖的lua腳本。KEYS[1]代表當前鎖的key值,ARGV[1]代表當前的客戶端標識,ARGV[2]代表過期時間。
private static final String OBTAIN_LOCK_SCRIPT = "local lockClientId = redis.call('GET', KEYS[1])\n"+ "if lockClientId == ARGV[1] then\n"+ "redis.call('PEXPIRE', KEYS[1], ARGV[2])\n"+ "return true\n"+ "elseif not lockClientId then\n"+ "redis.call('SET', KEYS[1], ARGV[1], 'PX', ARGV[2])\n"+ "return true\n"+ "end\n"+
"return false";
基本邏輯就是:拿着KEYS[1]去redis中獲取值,如果值等於ARGV[1]就表示這條數據已經被上鎖了,並且延長鎖的過期時間,如果想要獲取鎖鎖就要等待拿到鎖的進程釋放鎖;如果這個鍵KEYS[1]不存在,那么設置KEYS[1]的值為ARGV[1],並且設置過期時間為ARGV[2],即當前進程就獲取到這個數據的鎖,並設置過期時間。(對lua腳本和redis命令不熟悉的可以上redis中文網)
4.3RedisLockRegistry類的內部類RedisLock的結構如下:
RedisLockRegistry類中獲取鎖的方法:
......
private final Map<String, RedisLockRegistry.RedisLock> locks;
......
public Lock obtain(Object lockKey) { Assert.isInstanceOf(String.class, lockKey); String path = (String)lockKey; return (Lock)this.locks.computeIfAbsent(path, (x$0) -> { return new RedisLockRegistry.RedisLock(x$0); }); }
如上面代碼顯示,locks是RedisLockRegistry類的Map類型的常量,以String類型作為key,以RedisLockRegistry的內部類RedisLock作為value;
拿着lockKey作為key去這個map中查找是否已經存在(即這條數據是否已經上鎖),如果存在就返回這個lockKey對應的RedisLock,如果不存在就創建一個RedisLock並將其以此lockKey為key放入map中。
每個分布式部署的應用都會自己創建一個RedisLockRegistry實例,到這里,同一個應用的多個線程都可以獲取到這條共享數據的RedisLock對象,本地鎖+Redis鎖真正開始於調用通過RedisLockRegistry實例.obtain(lockKey)方法獲取到的RedisLock實例對象.trylock()方法,參見下文。
4.4RedisLockRegistry類的內部類的屬性和部分構造方法:
private final class RedisLock implements Lock { private final String lockKey; private final ReentrantLock localLock; //用於記錄上鎖的時間 private volatile long lockedAt; private RedisLock(String path) { this.localLock = new ReentrantLock(); this.lockKey = this.constructLockKey(path); } ...... }
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { long now = System.currentTimeMillis(); if (!this.localLock.tryLock(time, unit)) { return false; } else { try { long expire = now + TimeUnit.MILLISECONDS.convert(time, unit); boolean acquired; while(!(acquired = this.obtainLock()) && System.currentTimeMillis() < expire) { Thread.sleep(100L); } if (!acquired) { this.localLock.unlock(); } return acquired; } catch (Exception var9) { this.localLock.unlock(); this.rethrowAsLockException(var9); return false; } } } private boolean obtainLock() { Boolean success = (Boolean)RedisLockRegistry.this.redisTemplate.execute(RedisLockRegistry.this.obtainLockScript, Collections.singletonList(this.lockKey), new Object[]{RedisLockRegistry.this.clientId, String.valueOf(RedisLockRegistry.this.expireAfter)}); boolean result = Boolean.TRUE.equals(success); if (result) { this.lockedAt = System.currentTimeMillis(); } return result; }
redisTemplate的execute方法參數:
第一個參數就是要執行的lua腳本;
第二個參數就是表示在腳本中所用到的那些 Redis 鍵(key),這些鍵名參數可以在 Lua 中通過全局變量 KEYS 數組,用 1 為基址的形式訪問( KEYS[1] , KEYS[2] ,以此類推);
第三個參數那些不是鍵名參數的附加參數 arg [arg …] ,可以在 Lua 中通過全局變量 ARGV 數組訪問,訪問的形式和 KEYS 變量類似( ARGV[1] 、 ARGV[2] ,諸如此類)
分析tryLock源碼可以看出,首先獲取本地鎖,如果獲取失敗,即表示某個請求線程已經獲取到了鎖,直接返回false;如果獲取成功,就調用obtainLock方法執行OBTAIN_LOCK_SCRIPT這段lua腳本來獲取redis鎖,判斷其他進程的某個請求線程獲取到了這個redis鎖,如果獲取redis失敗,則acquired變量為false,同時釋放本地鎖,tryLock方法直接返回false,獲取鎖失敗。
為什么要用本地鎖?一個是為了可重入,另一個是為了減輕redis服務器的壓力。
4.5 釋放鎖:
public void unlock() { if (!this.localLock.isHeldByCurrentThread()) { throw new IllegalStateException("You do not own lock at " + this.lockKey); } else if (this.localLock.getHoldCount() > 1) { this.localLock.unlock(); } else { try { if (!this.isAcquiredInThisProcess()) { throw new IllegalStateException("Lock was released in the store due to expiration. The integrity of data protected by this lock may have been compromised."); } if (Thread.currentThread().isInterrupted()) { RedisLockRegistry.this.executor.execute(this::removeLockKey); } else { this.removeLockKey(); } if (RedisLockRegistry.logger.isDebugEnabled()) { RedisLockRegistry.logger.debug("Released lock; " + this); } } catch (Exception var5) { ReflectionUtils.rethrowRuntimeException(var5); } finally { this.localLock.unlock(); } } }
釋放鎖的過程也比較簡單,第一步通過本地鎖判斷當前線程是否持有鎖,第二步通過本地鎖判斷當前線程持有鎖的計數。
如果當前線程持有鎖的計數 > 1,說明本地鎖被當前線程多次獲取,這時只釋放本地鎖(釋放之后當前線程持有鎖的計數-1)。
如果當前線程持有鎖的計數 = 1,釋放本地鎖和redis鎖。
redis分布式鎖的使用參見另一篇博文:springboot實現分布式鎖