1、前言
眾所周知,對於高並發業務場景通常會考慮加鎖機制保證線程安全,比如使用Synchronized對象鎖。Synchronized為JVM進程級別,在項目采取單實例部署情況下幾乎可以勝任。但是當項目采用分布式架構,考慮采用多實例高可用部署情況時,Synchronized對象鎖應對高並發場景已經力不從心。
分布式高可用部署架構:

那么,分布式部署架構下如何避免高並發造成的“超買/超賣現象”等類似線程安全問題呢?還好,目前也有不少成熟解決方案,整體上都是圍繞實現分布式鎖,常見的實現方案有:
- 基於Redis(緩存等)實現分布式鎖。
- 基於ZooKeeper實現分布式鎖。
- 基於數據庫實現分布式鎖。
本文將重點探討如何采用Redis緩存實現分布式鎖。
2、Redis SETNX
通常,采用Redis SETNX指令實現基於Redis實現分布式鎖。Redis為單線程模型,可以將高並發場景操作映射為單點指令操作。
Redis數據庫指令:SETNX key value ,SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
(Refer to :http://redisdoc.com/string/setnx.html)
-
指令特性
只在鍵 key 不存在的情況下, 將鍵 key 的值設置為 value 。
若鍵 key 已經存在, 則 SETNX 命令不做任何動作。
-
返回值
命令在設置成功時返回 1 , 設置失敗時返回 0 。
- setnx是Redis命令中的方法,java中對應的實現方法是setIfAbsent()。
3、代碼驗證
下文我將展示一段購物庫存簡單的demo示例,若采用分布式部署多實例,那么在高並發情況下會存在哪些重要問題。
本文將采用JMeter性能測試工具,模擬高並發業務場景,完成高並發壓力測試。
-
代碼
@PostMapping("/buyProduct1")
public String buyProduct1() {
String buyerName = "顧客" + Thread.currentThread().getId();
Object stObj = redisTemplate.opsForValue().get("stockNum");
int stockNum = Integer.parseInt(stObj.toString());
if (stockNum > 0) {
redisTemplate.opsForValue().set("stockNum", --stockNum);
System.out.println(buyerName + "下單成功,庫存剩余件數:" + stockNum);
} else {
System.out.println(buyerName + "下單失敗,庫存不足.");
return buyerName + "下單失敗!";
}
return buyerName + "下單成功!";
}
-
JMeter測試
JMeter設置10個用戶線程,0.5s內並發請求一次。

執行成功,模擬購物成功。

-
執行結果
從IDE控制台日志可以看到,0.5s內10次請求,出現了“超賣現象”,很明顯的線程安全問題。

當前代碼如果在單實例部署架構中,可以采用Synchronized對象鎖實現線程安全控制(在業務代碼上添加鎖),但是在分布式部署架構下將無法實現有效控制。
4、優化代碼
采用Redis實現分布式鎖,並對上述簡單代碼添加分布式鎖機制,實現線程安全控制。通常,也有兩種具體的實現方式,詳細見下文代碼實現。
方式一:基於Redis SETNX指令
-
代碼實現
@PostMapping("/buyProduct2")
public String buyProduct2() {
String buyerName = "顧客" + Thread.currentThread().getId();
String lockKey = "buyProductLock";
String lockValue = UUID.randomUUID().toString().concat(UUID.randomUUID().toString());
try {
// setIfAbsent是java中的方法,setnx是redis命令中的方法
// 1.保證系統崩潰可以自然釋放鎖
// 2.保證redis操作原子性,避免設置超時時刻系統崩潰
Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
if (!isSuccess) {
System.out.println("系統繁忙,請稍后重試.");
return "系統繁忙,請稍后重試.";
}
int stockNum = Integer.parseInt(redisTemplate.opsForValue().get("stockNum").toString());
if (stockNum > 0) {
redisTemplate.opsForValue().set("stockNum", --stockNum);
System.out.println(buyerName + "下單成功,庫存剩余件數:" + stockNum);
} else {
System.out.println(buyerName + "下單失敗,庫存不足.");
return buyerName + "下單失敗!";
}
} finally {//3.保證操作成功和系統異常情況下都能釋放鎖
//4.采用線程標識主動檢查,保證僅刪除自己的鎖。避免redis超時時間小於業務邏輯執行時間,前一個線程釋放了后一個線程的加鎖,造成鎖永久失效。
//lockValue存儲方法棧中線程私有
if (lockValue.equals(redisTemplate.opsForValue().get(lockKey))) {
//釋放鎖
redisTemplate.delete(lockKey);
}
}
return buyerName + "下單成功!";
}
-
運行結果
部分線程執行成功,部分線程執行被攔截,保證了用戶並發下單庫存數據正確性,實現了線程安全控制。


方式二:采用Redisson 實現
-
實現原理

加鎖失敗情況下,可以設置超時時間T,在時間T內自旋加鎖,超過時間T之后加鎖失敗返回,避免死鎖。
當Redis集群為多Master-Slave模式時,Redis根據hash算法選擇一個master嘗試加鎖。
Redisson是通過執行lua腳本完成對Redis加鎖操作。
-
maven依賴
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.0</version>
</dependency>
-
Redisson配置
@Component public class redissonConfig { @Bean public Redisson redisson() { Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setDatabase(0); return (Redisson) Redisson.create(config); } }
-
代碼實現
@PostMapping("/buyProduct3")
public String buyProduct3() {
String buyerName = "顧客" + Thread.currentThread().getId();
String lockKey = "buyProductLock";
// redisson加鎖
RLock redissonLock = redisson.getLock(lockKey);
try {
//redisson設置鎖時間
redissonLock.lock(10, TimeUnit.SECONDS);
int stockNum = Integer.parseInt(redisTemplate.opsForValue().get("stockNum").toString());
if (stockNum > 0) {
redisTemplate.opsForValue().set("stockNum", --stockNum);
System.out.println(buyerName + "下單成功,庫存剩余件數:" + stockNum);
} else {
System.out.println(buyerName + "下單失敗,庫存不足.");
return buyerName + "下單失敗!";
}
} finally {
//redisson釋放鎖
redissonLock.unlock();
}
return buyerName + "下單成功!";
}
-
執行結果


5、總結
-
比較
方法一與方法二都實現了在分布式部署場景下,控制高並發業務請求下線程安全。方法一攔截並發線程,直接結束在業務邏輯執行過程中其他線程並發請求,並發吞入量較小。方法二基於Redisson可以設置並發線程等待狀態,保證每個線程請求都能完成業務,提高了系統並發吞吐量。另外,方式二的實現代碼量較少。
方法一基於Redis指令面臨的問題:當Redis設置超時時間<應用程序執行時間,Redis分布式鎖先於程序執行完成釋放,導致當前加鎖失效。方式二Redisson分布式鎖,通過加鎖時候開啟分線程,定期(小於redis超時時間,eg:1/3)檢查redis鎖標記,如果存在再延時機制,解決了這類時間差問題。
-
共同存在的問題
解決方案:
6、防重復提交代碼優化
對前文《防止重復提交解決方案》進行代碼優化,支持高並發場景線程安全控制。
-
代碼
@Around("preventDuplication()")
public Object before(ProceedingJoinPoint joinPoint) throws Throwable {
ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
HttpServletRequest request = attributes.getRequest();
Assert.notNull(request, "request cannot be null.");
//獲取執行方法
Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
//獲取防重復提交注解
PreventDuplication annotation = method.getAnnotation(PreventDuplication.class);
// 獲取token以及方法標記,生成redisKey和redisValue
String token = request.getHeader(IdempotentConstant.TOKEN);
String redisKey = IdempotentConstant.PREVENT_DUPLICATION_PREFIX
.concat(token)
.concat(getMethodSign(method, joinPoint.getArgs()));
String redisValue = redisKey.concat(annotation.value()).concat("submit duplication");
System.out.print("當前線程號:" + Thread.currentThread().getId());
System.out.println("存儲redisKey: " + redisKey);
redisValue.concat(UUID.randomUUID().toString() + Thread.currentThread().getId());
try {
//設置防重復操作限時標記(前置通知)
//redisTemplate實現jedis.setnx(key,value),setIfAbsent 是java中的方法,setnx 是 redis命令中的方法
Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(redisKey, redisValue, annotation.expireSeconds(), TimeUnit.SECONDS);
System.out.println("當前線程號:" + Thread.currentThread().getId() + "," + "startTime:" + isSuccess);
long startTime = System.currentTimeMillis();
if (!isSuccess) {
throw new RuntimeException("請勿重復提交");
}
System.out.println("當前線程號:" + Thread.currentThread().getId() + "," + "startTime:" + startTime + "ms耗時");
//ProceedingJoinPoint類型參數可以決定是否執行目標方法,且環繞通知必須要有返回值,返回值即為目標方法的返回值
Object proceed = joinPoint.proceed();
long endStart = System.currentTimeMillis();
System.out.println("當前線程號:" + Thread.currentThread().getId() + "," + "endStart:" + endStart + "ms耗時");
return proceed;
} finally {
//釋放鎖校驗是否為當前線程
if (redisValue.equals(redisTemplate.opsForValue().get(redisKey))) {
//釋放鎖
redisTemplate.delete(redisKey);
}
}
}
7、源代碼
本文代碼已經上傳托管至GitHub以及Gitee,有需要的讀者請自行下載。
- GitHub:https://github.com/gavincoder/distributedlock.git
- Gitee:https://gitee.com/gavincoderspace/distributedlock.git
