基於Redis實現分布式鎖


1、前言

  眾所周知,對於高並發業務場景通常會考慮加鎖機制保證線程安全,比如使用Synchronized對象鎖。Synchronized為JVM進程級別,在項目采取單實例部署情況下幾乎可以勝任。但是當項目采用分布式架構,考慮采用多實例高可用部署情況時,Synchronized對象鎖應對高並發場景已經力不從心。

分布式高可用部署架構:

  那么,分布式部署架構下如何避免高並發造成的“超買/超賣現象”等類似線程安全問題呢?還好,目前也有不少成熟解決方案,整體上都是圍繞實現分布式鎖,常見的實現方案有:

  • 基於Redis(緩存等)實現分布式鎖。
  • 基於ZooKeeper實現分布式鎖。
  • 基於數據庫實現分布式鎖。

  本文將重點探討如何采用Redis緩存實現分布式鎖。

 

2、Redis SETNX

  通常,采用Redis SETNX指令實現基於Redis實現分布式鎖。Redis為單線程模型,可以將高並發場景操作映射為單點指令操作。

  Redis數據庫指令:SETNX key value  ,SETNX 是『SET if Not eXists』(如果不存在,則 SET)的簡寫。

  (Refer to :http://redisdoc.com/string/setnx.html)

  • 指令特性

只在鍵 key 不存在的情況下, 將鍵 key 的值設置為 value 。
若鍵 key 已經存在, 則 SETNX 命令不做任何動作。
  • 返回值

命令在設置成功時返回 1 , 設置失敗時返回 0
  • setnx是Redis命令中的方法,java中對應的實現方法是setIfAbsent()。

 

3、代碼驗證

   下文我將展示一段購物庫存簡單的demo示例,若采用分布式部署多實例,那么在高並發情況下會存在哪些重要問題。

本文將采用JMeter性能測試工具,模擬高並發業務場景,完成高並發壓力測試。 

  • 代碼

    @PostMapping("/buyProduct1")
    public String buyProduct1() {
        String buyerName = "顧客" + Thread.currentThread().getId();
        Object stObj = redisTemplate.opsForValue().get("stockNum");
        int stockNum = Integer.parseInt(stObj.toString());
        if (stockNum > 0) {
            redisTemplate.opsForValue().set("stockNum", --stockNum);
            System.out.println(buyerName + "下單成功,庫存剩余件數:" + stockNum);
        } else {
            System.out.println(buyerName + "下單失敗,庫存不足.");
            return buyerName + "下單失敗!";
        }
        return buyerName + "下單成功!";
    }
  • JMeter測試

  JMeter設置10個用戶線程,0.5s內並發請求一次。

  執行成功,模擬購物成功。

 

  •  執行結果

  從IDE控制台日志可以看到,0.5s內10次請求,出現了“超賣現象”,很明顯的線程安全問題。

 

   當前代碼如果在單實例部署架構中,可以采用Synchronized對象鎖實現線程安全控制(在業務代碼上添加鎖),但是在分布式部署架構下將無法實現有效控制。

4、優化代碼

   采用Redis實現分布式鎖,並對上述簡單代碼添加分布式鎖機制,實現線程安全控制。通常,也有兩種具體的實現方式,詳細見下文代碼實現。

方式一:基於Redis SETNX指令

  • 代碼實現

    @PostMapping("/buyProduct2")
    public String buyProduct2() {
        String buyerName = "顧客" + Thread.currentThread().getId();
        String lockKey = "buyProductLock";
        String lockValue = UUID.randomUUID().toString().concat(UUID.randomUUID().toString());
        try {
            // setIfAbsent是java中的方法,setnx是redis命令中的方法
            // 1.保證系統崩潰可以自然釋放鎖
            // 2.保證redis操作原子性,避免設置超時時刻系統崩潰
            Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, lockValue, 10, TimeUnit.SECONDS);
            if (!isSuccess) {
                System.out.println("系統繁忙,請稍后重試.");
                return "系統繁忙,請稍后重試.";
            }
            int stockNum = Integer.parseInt(redisTemplate.opsForValue().get("stockNum").toString());
            if (stockNum > 0) {
                redisTemplate.opsForValue().set("stockNum", --stockNum);
                System.out.println(buyerName + "下單成功,庫存剩余件數:" + stockNum);
            } else {
                System.out.println(buyerName + "下單失敗,庫存不足.");
                return buyerName + "下單失敗!";
            }
        } finally {//3.保證操作成功和系統異常情況下都能釋放鎖
            //4.采用線程標識主動檢查,保證僅刪除自己的鎖。避免redis超時時間小於業務邏輯執行時間,前一個線程釋放了后一個線程的加鎖,造成鎖永久失效。
            //lockValue存儲方法棧中線程私有
            if (lockValue.equals(redisTemplate.opsForValue().get(lockKey))) {
                //釋放鎖
                redisTemplate.delete(lockKey);
            }
        }
        return buyerName + "下單成功!";
    }
  • 運行結果

  部分線程執行成功,部分線程執行被攔截,保證了用戶並發下單庫存數據正確性,實現了線程安全控制。

 

 

 

方式二:采用Redisson 實現

  • 實現原理

 

  加鎖失敗情況下,可以設置超時時間T,在時間T內自旋加鎖,超過時間T之后加鎖失敗返回,避免死鎖。

  當Redis集群為多Master-Slave模式時,Redis根據hash算法選擇一個master嘗試加鎖。

  Redisson是通過執行lua腳本完成對Redis加鎖操作。

  • maven依賴

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.15.0</version>
        </dependency>
  • Redisson配置

@Component
public class redissonConfig {

    @Bean
    public Redisson redisson() {
        Config config = new Config();
        config.useSingleServer()
                .setAddress("redis://127.0.0.1:6379")
                .setDatabase(0);
        return (Redisson) Redisson.create(config);
    }
}
  •  代碼實現

    @PostMapping("/buyProduct3")
    public String buyProduct3() {
        String buyerName = "顧客" + Thread.currentThread().getId();
        String lockKey = "buyProductLock";
        // redisson加鎖
        RLock redissonLock = redisson.getLock(lockKey);
        try {
            //redisson設置鎖時間
            redissonLock.lock(10, TimeUnit.SECONDS);
            int stockNum = Integer.parseInt(redisTemplate.opsForValue().get("stockNum").toString());
            if (stockNum > 0) {
                redisTemplate.opsForValue().set("stockNum", --stockNum);
                System.out.println(buyerName + "下單成功,庫存剩余件數:" + stockNum);
            } else {
                System.out.println(buyerName + "下單失敗,庫存不足.");
                return buyerName + "下單失敗!";
            }
        } finally {
            //redisson釋放鎖
            redissonLock.unlock();
        }
        return buyerName + "下單成功!";
    }
  • 執行結果

 

 

 

5、總結

  • 比較

  方法一與方法二都實現了在分布式部署場景下,控制高並發業務請求下線程安全。方法一攔截並發線程,直接結束在業務邏輯執行過程中其他線程並發請求,並發吞入量較小。方法二基於Redisson可以設置並發線程等待狀態,保證每個線程請求都能完成業務,提高了系統並發吞吐量。另外,方式二的實現代碼量較少。

  方法一基於Redis指令面臨的問題:當Redis設置超時時間<應用程序執行時間,Redis分布式鎖先於程序執行完成釋放,導致當前加鎖失效。方式二Redisson分布式鎖,通過加鎖時候開啟分線程,定期(小於redis超時時間,eg:1/3)檢查redis鎖標記,如果存在再延時機制,解決了這類時間差問題。

  • 共同存在的問題

  分布式架構下,Redis也以集群模式部署,當Redis master節點加鎖成功之后,返回成功,這是Redis主節點可能宕機故障,slave從節點晉升為主節點,造成Redis鎖標識丟失,從而導致分布式鎖失效。

解決方案:

  采取Redlock或者 Zookeeper。Redisson性能更高,確保絕對數據安全采用Zookeeper(也是主從結構)。

 

6、防重復提交代碼優化

   對前文《防止重復提交解決方案》進行代碼優化,支持高並發場景線程安全控制。

  • 代碼

    @Around("preventDuplication()")
    public Object before(ProceedingJoinPoint joinPoint) throws Throwable {
        ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder
                .getRequestAttributes();
        HttpServletRequest request = attributes.getRequest();
        Assert.notNull(request, "request cannot be null.");
        //獲取執行方法
        Method method = ((MethodSignature) joinPoint.getSignature()).getMethod();
        //獲取防重復提交注解
        PreventDuplication annotation = method.getAnnotation(PreventDuplication.class);
        // 獲取token以及方法標記,生成redisKey和redisValue
        String token = request.getHeader(IdempotentConstant.TOKEN);
        String redisKey = IdempotentConstant.PREVENT_DUPLICATION_PREFIX
                .concat(token)
                .concat(getMethodSign(method, joinPoint.getArgs()));
        String redisValue = redisKey.concat(annotation.value()).concat("submit duplication");
        System.out.print("當前線程號:" + Thread.currentThread().getId());
        System.out.println("存儲redisKey: " + redisKey);
        redisValue.concat(UUID.randomUUID().toString() + Thread.currentThread().getId());
        try {
            //設置防重復操作限時標記(前置通知)
            //redisTemplate實現jedis.setnx(key,value),setIfAbsent 是java中的方法,setnx 是 redis命令中的方法
            Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(redisKey, redisValue, annotation.expireSeconds(), TimeUnit.SECONDS);
            System.out.println("當前線程號:" + Thread.currentThread().getId() + "," + "startTime:" + isSuccess);
            long startTime = System.currentTimeMillis();
            if (!isSuccess) {
                throw new RuntimeException("請勿重復提交");
            }
            System.out.println("當前線程號:" + Thread.currentThread().getId() + "," + "startTime:" + startTime + "ms耗時");
            //ProceedingJoinPoint類型參數可以決定是否執行目標方法,且環繞通知必須要有返回值,返回值即為目標方法的返回值
            Object proceed = joinPoint.proceed();
            long endStart = System.currentTimeMillis();
            System.out.println("當前線程號:" + Thread.currentThread().getId() + "," + "endStart:" + endStart + "ms耗時");
            return proceed;
        } finally {
            //釋放鎖校驗是否為當前線程
            if (redisValue.equals(redisTemplate.opsForValue().get(redisKey))) {
                //釋放鎖
                redisTemplate.delete(redisKey);
            }
        }
    }

 

7、源代碼

 本文代碼已經上傳托管至GitHub以及Gitee,有需要的讀者請自行下載。

  • GitHub:https://github.com/gavincoder/distributedlock.git
  • Gitee:https://gitee.com/gavincoderspace/distributedlock.git

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM