redis 高並發分布式鎖實現


一般在單體應用中,如果遇到高並發可以通過 synchronized 或者 Lock 進行加鎖,但是現在大部分應用都是采用分布式的方式進行部署,這樣像 synchronized 和 Lock 這樣的鎖就不適用了。

這個使用我們可以使用分布式鎖來實現,分布式鎖的實現方式主要有:

  • 基於數據庫的分布式鎖
  • 基於緩存的分布式鎖
  • 基於 Zookeeper 的分布式鎖

本次主要記錄一下如果是用 redis 實現分布式鎖。

首先看一個示例:
本例使用 springboot 結合 redisTemplate 實現,具體如何配置,可以參考上一邊文章:springboot 整合 redisTemplate
這里只貼核心代碼:

@Controller
@RequestMapping("/")
public class HelloV2Controller {

    @Autowired
    private RedisTemplate redisTemplate;

    @RequestMapping("delStock")
    @ResponseBody
    public String delStock() {
        Integer result = (Integer) redisTemplate.opsForValue().get("stock");
        if (result > 0) {
            int remainStock = result - 1;
            redisTemplate.opsForValue().set("stock", remainStock);
            System.out.println("Remain Stock: " + remainStock);
        } else {
            System.out.println("Remain Stock: 0");
        }
        return "success";
    }

}

  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23

在這里插入圖片描述
在 redis 中設置 stock 的值為 50,然后訪問 http://localhost:8080/delStock
每次 -1

Remain Stock: 49
Remain Stock: 48
Remain Stock: 47
Remain Stock: 46
Remain Stock: 45
Remain Stock: 44
Remain Stock: 43
Remain Stock: 42
Remain Stock: 41
Remain Stock: 40
Remain Stock: 39
Remain Stock: 38
Remain Stock: 37
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

發現其實正常的,那么使用多線程訪問呢?
使用 groboutils 模擬多線程並發,你也可以使用 jmeter

<!-- junit 多線程測試 -->
<!-- https://mvnrepository.com/artifact/net.sourceforge.groboutils/groboutils-core -->
<dependency>
	<groupId>net.sourceforge.groboutils</groupId>
	<artifactId>groboutils-core</artifactId>
	<version>5</version>
	<scope>test</scope>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

代碼:

public class Test {

    @org.junit.Test
    public void test() {

        TestRunnable runner = new TestRunnable() {
            @Override
            public void runTest() throws Throwable {
                String url = "http://localhost:8080/delStock";
                HttpGet get = new HttpGet(url);
                CloseableHttpClient client = HttpClientBuilder.create().build();
                CloseableHttpResponse response = client.execute(get);
                response.close();
            }
        };

        int runnerCount = 10;
        // Rnner數組,想當於並發多少個。
        TestRunnable[] trs = new TestRunnable[runnerCount];
        for (int i = 0; i < runnerCount; i++) {
            trs[i] = runner;
        }
        // 用於執行多線程測試用例的Runner,將前面定義的單個Runner組成的數組傳入
        MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(trs);
        try {
            // 開發並發執行數組里定義的內容
            mttr.runTestRunnables();
        } catch (Throwable e) {
            e.printStackTrace();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

打印結果:

Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
...
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9

發現其出現了並發問題。
使用 synchronized 對其加鎖呢?

@RequestMapping("delStock")
@ResponseBody
public String delStock() {
    synchronized (HelloV2Controller.class) {
        Integer result = (Integer) redisTemplate.opsForValue().get("stock");
        if (result > 0) {
            int remainStock = result - 1;
            redisTemplate.opsForValue().set("stock", remainStock);
            System.out.println("Remain Stock: " + remainStock);
        } else {
            System.out.println("Remain Stcock: 0");
        }
    }
    return "success";
}

---------------------------------------------
Remain Stock: 49
Remain Stock: 48
Remain Stock: 47
Remain Stock: 46
Remain Stock: 45
Remain Stock: 44
Remain Stock: 43
Remain Stock: 42
Remain Stock: 41
Remain Stock: 40
Remain Stock: 39
Remain Stock: 38
Remain Stock: 37
Remain Stock: 36
.....
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

解決了並發的問題,但是這是在單體應用下場景,那要是分布式的環境下呢?
使用 nginx 搭建分布式環境,首先安裝 nginx
nginx 配置
在這里插入圖片描述
更改 springboot 啟動端口,server.port=8081
在使用 Test 測試類訪問端口號:
在這里插入圖片描述
8081 端口結果:

Remain Stock: 49
Remain Stock: 48
Remain Stock: 47
Remain Stock: 46
Remain Stock: 45
Remain Stock: 44
Remain Stock: 43
Remain Stock: 42
Remain Stock: 41
Remain Stock: 40
Remain Stock: 39
Remain Stock: 38
Remain Stock: 37
Remain Stock: 36
Remain Stock: 35
Remain Stock: 34
Remain Stock: 33
Remain Stock: 32
Remain Stock: 31
Remain Stock: 30
Remain Stock: 29
Remain Stock: 28
Remain Stock: 27
Remain Stock: 26
Remain Stock: 25
Remain Stock: 24
Remain Stock: 23
Remain Stock: 22
Remain Stock: 21
Remain Stock: 20
Remain Stock: 19
Remain Stock: 18
Remain Stock: 17
Remain Stock: 16
Remain Stock: 15
Remain Stock: 14
Remain Stock: 13
Remain Stock: 12
Remain Stock: 11
Remain Stock: 10
Remain Stock: 9
Remain Stock: 8
Remain Stock: 7
Remain Stock: 6
Remain Stock: 5
Remain Stock: 4
Remain Stock: 3
Remain Stock: 2
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48

8080 端口結果:

Remain Stock: 49
Remain Stock: 48
  • 1
  • 2

發現還是會有並發問題。

redis 實現分布式鎖

那使用 redis 實現分布式鎖:

public String delStock() {
        // synchronized (HelloV2Controller.class) { // 單體應用沒有問題,但是在分布式情況下就不適用了
        String lockKey = "lockKey";
        Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue"); // 相當於jedis中的 jedis.setnx
        if (!lock) {
            return "false";
        }
        Integer result = (Integer) redisTemplate.opsForValue().get("stock");
        if (result > 0) {
            int remainStock = result - 1;
            redisTemplate.opsForValue().set("stock", remainStock);
            System.out.println("Remain Stock: " + remainStock);
        } else {
            System.out.println("Remain Stcock: 0");
            // }
        }
        // 加鎖記得刪除鎖
        redisTemplate.delete(lockKey);
        return "success";
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

Test 測試類並發訪問:
8081 端口結果:

Remain Stock: 499
Remain Stock: 498
Remain Stock: 497
Remain Stock: 496
Remain Stock: 493
Remain Stock: 491
Remain Stock: 487
Remain Stock: 486
Remain Stock: 485
Remain Stock: 483
Remain Stock: 481
Remain Stock: 479
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

8080 端口訪問結果:

Remain Stock: 495
Remain Stock: 494
Remain Stock: 492
Remain Stock: 490
Remain Stock: 489
Remain Stock: 488
Remain Stock: 484
Remain Stock: 482
Remain Stock: 480
Remain Stock: 478
Remain Stock: 477
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

解決了並發問題。
但是這樣就安全了嗎 ?

  1. 業務代碼發生異常
    在這里插入圖片描述
    這里發生異常,就無法刪除鎖,導致死鎖。
    在業務代碼中加入 try catch finally
    在這里插入圖片描述
  2. 服務端突然宕機
    比如拿到鎖執行到業務代碼時,應用重啟。解決方式:設置一個超時時間
    在這里插入圖片描述
    這種方式不能保證原子性,還是會出現死鎖的問題。
    redisTemplate 的方法實現了原子性的方法:
public String delStock() {
   // synchronized (HelloV2Controller.class) { // 單體應用沒有問題,但是在分布式情況下就不適用了
    String lockKey = "lockKey";
    try {
        // Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey,
        // "lockValue"); // 相當於jedis中的
        // jedis.setnx
        Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue", 10, TimeUnit.SECONDS);
        // redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); // 不能保證原子性
        if (!lock) {
            return "false";
        }
        Integer result = (Integer) redisTemplate.opsForValue().get("stock");
        if (result > 0) {
            int remainStock = result - 1;
            redisTemplate.opsForValue().set("stock", remainStock);
            System.out.println("Remain Stock: " + remainStock);
        } else {
            System.out.println("Remain Stcock: 0");
            // }
        }
    } catch (Exception e) {
    } finally {
        // 加鎖記得刪除鎖
        redisTemplate.delete(lockKey);
    }
    return "success";
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

但是這種情況可能出現業務代碼 10 秒還沒有執行完,redis 鎖就失效了, 甚至可能導致鎖的永久失效。

比如 線程 Thread1 拿到鎖,執行了10沒有執行完,這個時候鎖失效了。
線程 Thread2 拿到鎖,在執行到 15 秒時,Thread1 將鎖刪掉了,這個時候 Thread3 過來又拿到了鎖,從而可能導致鎖的永久失效。
那又該如何解決?
解決方法:
自己刪除自己的鎖:
在這里插入圖片描述
但是這種方式還是會導致會可能有兩個線程持有鎖,那么該如何保證只有一個線程持有鎖呢?
解決方案:
在后台開啟一個線程,給鎖續期。Redisson 實現了給鎖續期。

Redisson 實現原理

在這里插入圖片描述

<dependency>
	<groupId>org.redisson</groupId>
	<artifactId>redisson</artifactId>
	<version>3.5.0</version>
</dependency>
  • 1
  • 2
  • 3
  • 4
  • 5

config

@Bean
 public Redisson redisson() {
      Config config = new Config();
      config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
      return (Redisson) Redisson.create(config);
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

修改 controller
在這里插入圖片描述
其實 redisson 默認實現的續期的原理,就相當於使用

Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue", 10, TimeUnit.SECONDS);
redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); // 不能保證原子性
  • 1
  • 2

還有一種情況,線程Thread1獲得鎖,開始執行業務邏輯,這個時候 redis 的master 突然宕機,但是還沒有同步到slave,這個時候主從集群從 slave 節點中重新選擇master節點,但是slave中並沒有 Thread1 的鎖,這個時候 Thread2 來,便可能獲得鎖。

解決方法:
保證鎖在 master 及 slave 均存在的情況下,才能加鎖成功。

如果你有什么好的實現方式,歡迎留言探討。

 
 
轉載: 
https://blog.csdn.net/wjavadog/article/details/103221855


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM