1、前言
眾所周知,對於高並發業務場景通常會考慮加鎖機制保證線程安全,比如使用Synchronized對象鎖。Synchronized為JVM進程級別,在項目采取單實例部署情況下幾乎可以勝任。但是當項目采用分布式架構,考慮采用多實例高可用部署情況時,Synchronized對象鎖應對高並發場景已經力不從心。
分布式高可用部署架構:
那么,分布式部署架構下如何避免高並發造成的“超買/超賣現象”等類似線程安全問題呢?還好,目前也有不少成熟解決方案,整體上都是圍繞實現分布式鎖,常見的實現方案有:
- 基於Redis(緩存等)實現分布式鎖。
- 基於ZooKeeper實現分布式鎖。
- 基於數據庫實現分布式鎖。
本文將重點探討如何采用Redis緩存實現分布式鎖。
2、Redis SETNX
通常,采用Redis SETNX指令實現基於Redis實現分布式鎖。Redis為單線程模型,可以將高並發場景操作映射為單點指令操作。
Redis數據庫指令:SETNX key value ,SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。
(Refer to :http://redisdoc.com/string/setnx.html)
-
指令特性
只在鍵 key 不存在的情況下, 將鍵 key 的值設置為 value 。
若鍵 key 已經存在, 則 SETNX 命令不做任何動作。
-
返回值
命令在設置成功時返回 1 , 設置失敗時返回 0 。
- setnx是Redis命令中的方法,java中對應的實現方法是setIfAbsent()。
3、代碼驗證
下文我將展示一段購物庫存簡單的demo示例,若采用分布式部署多實例,那么在高並發情況下會存在哪些重要問題。
本文將采用JMeter性能測試工具,模擬高並發業務場景,完成高並發壓力測試。
-
代碼
@PostMapping("/buyProduct1") public String buyProduct1() { String buyerName = "顧客" + Thread.currentThread().getId(); Object stObj = redisTemplate.opsForValue().get("stockNum"); int stockNum = Integer.parseInt(stObj.toString()); if (stockNum > 0) { redisTemplate.opsForValue().set("stockNum", --stockNum); System.out.println(buyerName + "下單成功,庫存剩余件數:" + stockNum); } else { System.out.println(buyerName + "下單失敗,庫存不足."); return buyerName + "下單失敗!"; } return buyerName + "下單成功!"; }
-
JMeter測試
JMeter設置10個用戶線程,0.5s內並發請求一次。
執行成功,模擬購物成功。
-
執行結果
從IDE控制台日志可以看到,0.5s內10次請求,出現了“超賣現象”,很明顯的線程安全問題。
當前代碼如果在單實例部署架構中,可以采用Synchronized對象鎖實現線程安全控制(在業務代碼上添加鎖),但是在分布式部署架構下將無法實現有效控制。
4、優化代碼
采用Redis實現分布式鎖,並對上述簡單代碼添加分布式鎖機制,實現線程安全控制。通常,也有兩種具體的實現方式,詳細見下文代碼實現。
方式一:基於Redis SETNX指令
-
代碼實現
@PostMapping("/buyProduct2") public String buyProduct2() { String buyerName = "顧客" + Thread.currentThread().getId(); String lockKey = "buyProductLock"; String lockValue = UUID.randomUUID().toString().concat(UUID.randomUUID().toString()); try { // setIfAbsent是java中的方法,setnx是redis命令中的方法 // 1.保證系統崩潰可以自然釋放鎖 // 2.保證redis操作原子性,避免設置超時時刻系統崩潰 Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS); if (!isSuccess) { System.out.println("系統繁忙,請稍后重試."); return "系統繁忙,請稍后重試."; } int stockNum = Integer.parseInt(redisTemplate.opsForValue().get("stockNum").toString()); if (stockNum > 0) { redisTemplate.opsForValue().set("stockNum", --stockNum); System.out.println(buyerName + "下單成功,庫存剩余件數:" + stockNum); } else { System.out.println(buyerName + "下單失敗,庫存不足."); return buyerName + "下單失敗!"; } } finally {//3.保證操作成功和系統異常情況下都能釋放鎖 //4.采用線程標識主動檢查,保證僅刪除自己的鎖。避免redis超時時間小於業務邏輯執行時間,前一個線程釋放了后一個線程的加鎖,造成鎖永久失效。 //lockValue存儲方法棧中線程私有 if (lockValue.equals(redisTemplate.opsForValue().get(lockKey))) { //釋放鎖 redisTemplate.delete(lockKey); } } return buyerName + "下單成功!"; }
-
運行結果
部分線程執行成功,部分線程執行被攔截,保證了用戶並發下單庫存數據正確性,實現了線程安全控制。
方式二:采用Redisson 實現
-
實現原理
加鎖失敗情況下,可以設置超時時間T,在時間T內自旋加鎖,超過時間T之后加鎖失敗返回,避免死鎖。
當Redis集群為多Master-Slave模式時,Redis根據hash算法選擇一個master嘗試加鎖。
Redisson是通過執行lua腳本完成對Redis加鎖操作。
-
maven依賴
<dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.15.0</version> </dependency>
-
Redisson配置
@Component public class redissonConfig { @Bean public Redisson redisson() { Config config = new Config(); config.useSingleServer() .setAddress("redis://127.0.0.1:6379") .setDatabase(0); return (Redisson) Redisson.create(config); } }
-
代碼實現
@PostMapping("/buyProduct3") public String buyProduct3() { String buyerName = "顧客" + Thread.currentThread().getId(); String lockKey = "buyProductLock"; // redisson加鎖 RLock redissonLock = redisson.getLock(lockKey); try { //redisson設置鎖時間 redissonLock.lock(10, TimeUnit.SECONDS); int stockNum = Integer.parseInt(redisTemplate.opsForValue().get("stockNum").toString()); if (stockNum > 0) { redisTemplate.opsForValue().set("stockNum", --stockNum); System.out.println(buyerName + "下單成功,庫存剩余件數:" + stockNum); } else { System.out.println(buyerName + "下單失敗,庫存不足."); return buyerName + "下單失敗!"; } } finally { //redisson釋放鎖 redissonLock.unlock(); } return buyerName + "下單成功!"; }
-
執行結果
5、總結
-
比較
方法一與方法二都實現了在分布式部署場景下,控制高並發業務請求下線程安全。方法一攔截並發線程,直接結束在業務邏輯執行過程中其他線程並發請求,並發吞入量較小。方法二基於Redisson可以設置並發線程等待狀態,保證每個線程請求都能完成業務,提高了系統並發吞吐量。另外,方式二的實現代碼量較少。
方法一基於Redis指令面臨的問題:當Redis設置超時時間<應用程序執行時間,Redis分布式鎖先於程序執行完成釋放,導致當前加鎖失效。方式二Redisson分布式鎖,通過加鎖時候開啟分線程,定期(小於redis超時時間,eg:1/3)檢查redis鎖標記,如果存在再延時機制,解決了這類時間差問題。
-
共同存在的問題
解決方案:
6、防重復提交代碼優化
對前文《防止重復提交解決方案》進行代碼優化,支持高並發場景線程安全控制。
-
代碼
@Around("preventDuplication()") public Object before(ProceedingJoinPoint joinPoint) throws Throwable { ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder .getRequestAttributes(); HttpServletRequest request = attributes.getRequest(); Assert.notNull(request, "request cannot be null."); //獲取執行方法 Method method = ((MethodSignature) joinPoint.getSignature()).getMethod(); //獲取防重復提交注解 PreventDuplication annotation = method.getAnnotation(PreventDuplication.class); // 獲取token以及方法標記,生成redisKey和redisValue String token = request.getHeader(IdempotentConstant.TOKEN); String redisKey = IdempotentConstant.PREVENT_DUPLICATION_PREFIX .concat(token) .concat(getMethodSign(method, joinPoint.getArgs())); String redisValue = redisKey.concat(annotation.value()).concat("submit duplication"); System.out.print("當前線程號:" + Thread.currentThread().getId()); System.out.println("存儲redisKey: " + redisKey); redisValue.concat(UUID.randomUUID().toString() + Thread.currentThread().getId()); try { //設置防重復操作限時標記(前置通知) //redisTemplate實現jedis.setnx(key,value),setIfAbsent 是java中的方法,setnx 是 redis命令中的方法 Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(redisKey, redisValue, annotation.expireSeconds(), TimeUnit.SECONDS); System.out.println("當前線程號:" + Thread.currentThread().getId() + "," + "startTime:" + isSuccess); long startTime = System.currentTimeMillis(); if (!isSuccess) { throw new RuntimeException("請勿重復提交"); } System.out.println("當前線程號:" + Thread.currentThread().getId() + "," + "startTime:" + startTime + "ms耗時"); //ProceedingJoinPoint類型參數可以決定是否執行目標方法,且環繞通知必須要有返回值,返回值即為目標方法的返回值 Object proceed = joinPoint.proceed(); long endStart = System.currentTimeMillis(); System.out.println("當前線程號:" + Thread.currentThread().getId() + "," + "endStart:" + endStart + "ms耗時"); return proceed; } finally { //釋放鎖校驗是否為當前線程 if (redisValue.equals(redisTemplate.opsForValue().get(redisKey))) { //釋放鎖 redisTemplate.delete(redisKey); } } }
7、源代碼
本文代碼已經上傳托管至GitHub以及Gitee,有需要的讀者請自行下載。
- GitHub:https://github.com/gavincoder/distributedlock.git
- Gitee:https://gitee.com/gavincoderspace/distributedlock.git