原文鏈接:https://www.changxuan.top/?p=1230
在單體架構向分布式集群架構演進的過程中,項目中必不可少的一個功能組件就是分布式鎖。在開發團隊有技術積累的情況下,做為團隊的一個“工具人”往往有限的時間都投入到了業(C)務(U)開(R)發(D)上,並不會去深究工具類中的分布式鎖到底是如何實現的。大家只需要清楚如何使用某個同事寫好的 Redis 工具類就可以了。所以,今天就帶大家從零開始實現一個基於Redis的可以在項目中直接使用的分布式鎖。
首先,需要搞清楚一個問題,我們為什么需要分布式鎖或者說為什么需要鎖?下面我們通過一張圖來說明這個問題,
在上面這張圖中,同時有兩個線程 Thread A 和 Thread B 要做同一件事,你可以理解它們為要同時執行一個代碼塊。但是,執行這個代碼塊有個特殊的要求:不能有多個線程同時執行,不然系統數據就會出亂子!所有需要執行這段代碼的線程,需要挨個排隊來執行。所以,此時就需要有一個所有線程都能訪問到的一個變量,根據這個變量的狀態來判斷此時是否有其它線程正在執行,來決定當前線程是否能夠執行,那么這個變量就是“鎖”。假如設變量為 static volitate int lock = 0;
,當 lock
的值為 0 時,表明此時沒有線程在執行這段有特殊要求的代碼,當 lock
的值為 1 時,表明此時有其它線程在執行這段有特殊要求的代碼。當某個線程獲取到 lock
值為 0,且將 lock
值改為 1 的過程,稱為成功獲取鎖(注:此過程需要是原子性的);當該線程執行完這段代碼后,將 lock
的值改為 0 的操作稱為釋放鎖。
在分布式系統中,由於子系統需要支持水平擴展所以就不能把內存變量的狀態做為“一把鎖”了。不過我們可以把變量放到 Redis 中,這樣所有節點的線程都能夠訪問和操作了。
一、 一把簡單的“鎖”
“一口吃不成個大胖子”,我們先實現一個最簡單 Redis 鎖。我們通過逐漸發現問題並解決的過程,來加深理解。
根據前文所描述的鎖的基本原理,我首先寫了兩個方法,一個是獲取鎖 boolean lock(String key)
,一個是釋放鎖 void unlock(String key)
。
@RedisUtil.java
/**
* 加鎖
* @param key 鎖名稱
* @return lock 是否獲取鎖, true:獲取, false:未獲取
*/
public boolean lock(String key) {
boolean lock;
try {
// setIfAbsent 等價於 Redis 的 setnx 命令,具有原子性
lock = Boolean.TRUE.equals(redisTemplate.opsForValue().setIfAbsent(key, "LOCK"));
} catch (Exception e) {
log.error("獲取鎖:{},出現異常{}", key, e);
return false;
}
return lock;
}
/**
* 釋放鎖
* @param key 鎖名稱
*/
public void unlock(String key) {
if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {
redisTemplate.delete(key);
}
}
下面我們就寫代碼來測試一下,
測試思路:
創建十個線程調用 work() 方法
加鎖:方法獲取到鎖的線程對變量 count 執行1000次自增操作,未獲取到鎖的線程則不執行自增操作。
不加鎖:由於自增操作是非原子性的,所以最終 count 的結果會小於 10000 大於 1000 。
@WorkService.java
public static int count = 0;
private static final int TIME = 1000;
public void work() {
String key = "TEST_KEY";
if (redisUtil.lock(key)) {
log.info("線程:{},已經獲取鎖", Thread.currentThread().getName());
try {
for (int i = 0; i < TIME; i++) {
WorkService.count++;
}
}catch (Exception e) {
log.error("發生錯誤",e);
}finally {
redisUtil.unlock(key);
log.info("線程:{},已經釋放鎖", Thread.currentThread().getName());
}
} else {
log.info("線程:{},未獲取到鎖", Thread.currentThread().getName());
}
}
public void notLockWork() {
for (int i = 0; i < TIME; i++) {
WorkService.count++;
}
}
@WorkServiceTest.java
@Test
void work() throws Exception {
CountDownLatch downLatch = new CountDownLatch(10);
LinkedList<Thread> threads = new LinkedList<>();
for (int i = 0; i < 10; ++i) {
Thread thread = new Thread(() -> {
// 加鎖測試
workService.work();
// 未加鎖測試
// workService.notLockWork()
downLatch.countDown();
});
threads.add(thread);
}
for (Thread thread : threads) {
thread.start();
}
downLatch.await();
System.out.println("count = " + WorkService.count);
}
加鎖測試結果(控制台輸出)
2021-01-17 16:52:35.256 INFO 8648 --- [Thread-150] com.cxcoder.services.WorkService : 線程:Thread-150,已經獲取鎖
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-152] com.cxcoder.services.WorkService : 線程:Thread-152,未獲取到鎖
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-149] com.cxcoder.services.WorkService : 線程:Thread-149,未獲取到鎖
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-154] com.cxcoder.services.WorkService : 線程:Thread-154,未獲取到鎖
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-148] com.cxcoder.services.WorkService : 線程:Thread-148,未獲取到鎖
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-153] com.cxcoder.services.WorkService : 線程:Thread-153,未獲取到鎖
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-151] com.cxcoder.services.WorkService : 線程:Thread-151,未獲取到鎖
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-146] com.cxcoder.services.WorkService : 線程:Thread-146,未獲取到鎖
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-147] com.cxcoder.services.WorkService : 線程:Thread-147,未獲取到鎖
2021-01-17 16:52:35.277 INFO 8648 --- [Thread-155] com.cxcoder.services.WorkService : 線程:Thread-155,未獲取到鎖
2021-01-17 16:52:35.320 INFO 8648 --- [Thread-150] com.cxcoder.services.WorkService : 線程:Thread-150,已經釋放鎖
count = 1000
未加鎖測試結果(控制台輸出)
count = 3287
從控制台輸出結果來看,目前的鎖已經可以用來限制某塊代碼在某一時刻只能有一個線程在執行。但是也能夠發現,該鎖的實現機制還存在一些問題和不能滿足的需求。
例如,如果在項目里中出現”惡意代碼“或者不規范代碼的情況下則會出現預料之外的結果。看下面的例子,
@WorkService.java
public void work(){
String key = "TEST_KEY";
try {
if (redisUtil.lock(key)){
log.info("線程:{},獲取鎖", Thread.currentThread().getName());
... ... //(A)
}
}catch (Exception e) {
log.error("發生錯誤",e);
}finally {
redisUtil.unlock(key);
log.info("線程:{},釋放鎖", Thread.currentThread().getName());
}
}
如果 void work()
是上面的這種寫法,會出現什么問題呢?當有線程X
獲取到鎖后,正在執行 A 處的代碼時。這時 線程B
來到后獲取鎖失敗,卻執行了 finally
里的代碼將鎖給釋放了。此時 線程A
還在執行的過程中,又來了 線程C
獲取到鎖后也開始執行。所以,這個 Redis 鎖的實現機制存在一個比較嚴重的問題是某個線程所持有的鎖可以被其它線程隨意給釋放掉。另外,從控制台輸出的結果中可以看出在某個線程持有鎖的時間段內,其它線程是未被阻塞的。目前這個鎖應該被稱為存在問題的基於Redis的非阻塞的分布式鎖。