一般在單體應用中,如果遇到高並發可以通過 synchronized 或者 Lock 進行加鎖,但是現在大部分應用都是采用分布式的方式進行部署,這樣像 synchronized 和 Lock 這樣的鎖就不適用了。
這個使用我們可以使用分布式鎖來實現,分布式鎖的實現方式主要有:
- 基於數據庫的分布式鎖
- 基於緩存的分布式鎖
- 基於 Zookeeper 的分布式鎖
本次主要記錄一下如果是用 redis 實現分布式鎖。
首先看一個示例:
本例使用 springboot 結合 redisTemplate 實現,具體如何配置,可以參考上一邊文章:springboot 整合 redisTemplate
這里只貼核心代碼:
@Controller
@RequestMapping("/")
public class HelloV2Controller {
@Autowired
private RedisTemplate redisTemplate;
@RequestMapping("delStock")
@ResponseBody
public String delStock() {
Integer result = (Integer) redisTemplate.opsForValue().get("stock");
if (result > 0) {
int remainStock = result - 1;
redisTemplate.opsForValue().set("stock", remainStock);
System.out.println("Remain Stock: " + remainStock);
} else {
System.out.println("Remain Stock: 0");
}
return "success";
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
在 redis 中設置 stock 的值為 50,然后訪問 http://localhost:8080/delStock
每次 -1
Remain Stock: 49
Remain Stock: 48
Remain Stock: 47
Remain Stock: 46
Remain Stock: 45
Remain Stock: 44
Remain Stock: 43
Remain Stock: 42
Remain Stock: 41
Remain Stock: 40
Remain Stock: 39
Remain Stock: 38
Remain Stock: 37
...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
發現其實正常的,那么使用多線程訪問呢?
使用 groboutils 模擬多線程並發,你也可以使用 jmeter
<!-- junit 多線程測試 -->
<!-- https://mvnrepository.com/artifact/net.sourceforge.groboutils/groboutils-core -->
<dependency>
<groupId>net.sourceforge.groboutils</groupId>
<artifactId>groboutils-core</artifactId>
<version>5</version>
<scope>test</scope>
</dependency>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
代碼:
public class Test {
@org.junit.Test
public void test() {
TestRunnable runner = new TestRunnable() {
@Override
public void runTest() throws Throwable {
String url = "http://localhost:8080/delStock";
HttpGet get = new HttpGet(url);
CloseableHttpClient client = HttpClientBuilder.create().build();
CloseableHttpResponse response = client.execute(get);
response.close();
}
};
int runnerCount = 10;
// Rnner數組,想當於並發多少個。
TestRunnable[] trs = new TestRunnable[runnerCount];
for (int i = 0; i < runnerCount; i++) {
trs[i] = runner;
}
// 用於執行多線程測試用例的Runner,將前面定義的單個Runner組成的數組傳入
MultiThreadedTestRunner mttr = new MultiThreadedTestRunner(trs);
try {
// 開發並發執行數組里定義的內容
mttr.runTestRunnables();
} catch (Throwable e) {
e.printStackTrace();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
打印結果:
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
Remain Stock: 21
...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
發現其出現了並發問題。
使用 synchronized 對其加鎖呢?
@RequestMapping("delStock")
@ResponseBody
public String delStock() {
synchronized (HelloV2Controller.class) {
Integer result = (Integer) redisTemplate.opsForValue().get("stock");
if (result > 0) {
int remainStock = result - 1;
redisTemplate.opsForValue().set("stock", remainStock);
System.out.println("Remain Stock: " + remainStock);
} else {
System.out.println("Remain Stcock: 0");
}
}
return "success";
}
---------------------------------------------
Remain Stock: 49
Remain Stock: 48
Remain Stock: 47
Remain Stock: 46
Remain Stock: 45
Remain Stock: 44
Remain Stock: 43
Remain Stock: 42
Remain Stock: 41
Remain Stock: 40
Remain Stock: 39
Remain Stock: 38
Remain Stock: 37
Remain Stock: 36
.....
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
解決了並發的問題,但是這是在單體應用下場景,那要是分布式的環境下呢?
使用 nginx 搭建分布式環境,首先安裝 nginx
nginx 配置
更改 springboot 啟動端口,server.port=8081
在使用 Test 測試類訪問端口號:
8081 端口結果:
Remain Stock: 49
Remain Stock: 48
Remain Stock: 47
Remain Stock: 46
Remain Stock: 45
Remain Stock: 44
Remain Stock: 43
Remain Stock: 42
Remain Stock: 41
Remain Stock: 40
Remain Stock: 39
Remain Stock: 38
Remain Stock: 37
Remain Stock: 36
Remain Stock: 35
Remain Stock: 34
Remain Stock: 33
Remain Stock: 32
Remain Stock: 31
Remain Stock: 30
Remain Stock: 29
Remain Stock: 28
Remain Stock: 27
Remain Stock: 26
Remain Stock: 25
Remain Stock: 24
Remain Stock: 23
Remain Stock: 22
Remain Stock: 21
Remain Stock: 20
Remain Stock: 19
Remain Stock: 18
Remain Stock: 17
Remain Stock: 16
Remain Stock: 15
Remain Stock: 14
Remain Stock: 13
Remain Stock: 12
Remain Stock: 11
Remain Stock: 10
Remain Stock: 9
Remain Stock: 8
Remain Stock: 7
Remain Stock: 6
Remain Stock: 5
Remain Stock: 4
Remain Stock: 3
Remain Stock: 2
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
8080 端口結果:
Remain Stock: 49
Remain Stock: 48
- 1
- 2
發現還是會有並發問題。
redis 實現分布式鎖
那使用 redis 實現分布式鎖:
public String delStock() {
// synchronized (HelloV2Controller.class) { // 單體應用沒有問題,但是在分布式情況下就不適用了
String lockKey = "lockKey";
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue"); // 相當於jedis中的 jedis.setnx
if (!lock) {
return "false";
}
Integer result = (Integer) redisTemplate.opsForValue().get("stock");
if (result > 0) {
int remainStock = result - 1;
redisTemplate.opsForValue().set("stock", remainStock);
System.out.println("Remain Stock: " + remainStock);
} else {
System.out.println("Remain Stcock: 0");
// }
}
// 加鎖記得刪除鎖
redisTemplate.delete(lockKey);
return "success";
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
Test 測試類並發訪問:
8081 端口結果:
Remain Stock: 499
Remain Stock: 498
Remain Stock: 497
Remain Stock: 496
Remain Stock: 493
Remain Stock: 491
Remain Stock: 487
Remain Stock: 486
Remain Stock: 485
Remain Stock: 483
Remain Stock: 481
Remain Stock: 479
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
8080 端口訪問結果:
Remain Stock: 495
Remain Stock: 494
Remain Stock: 492
Remain Stock: 490
Remain Stock: 489
Remain Stock: 488
Remain Stock: 484
Remain Stock: 482
Remain Stock: 480
Remain Stock: 478
Remain Stock: 477
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
解決了並發問題。
但是這樣就安全了嗎 ?
- 業務代碼發生異常
這里發生異常,就無法刪除鎖,導致死鎖。
在業務代碼中加入 try catch finally - 服務端突然宕機
比如拿到鎖執行到業務代碼時,應用重啟。解決方式:設置一個超時時間
這種方式不能保證原子性,還是會出現死鎖的問題。
redisTemplate 的方法實現了原子性的方法:
public String delStock() {
// synchronized (HelloV2Controller.class) { // 單體應用沒有問題,但是在分布式情況下就不適用了
String lockKey = "lockKey";
try {
// Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey,
// "lockValue"); // 相當於jedis中的
// jedis.setnx
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue", 10, TimeUnit.SECONDS);
// redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); // 不能保證原子性
if (!lock) {
return "false";
}
Integer result = (Integer) redisTemplate.opsForValue().get("stock");
if (result > 0) {
int remainStock = result - 1;
redisTemplate.opsForValue().set("stock", remainStock);
System.out.println("Remain Stock: " + remainStock);
} else {
System.out.println("Remain Stcock: 0");
// }
}
} catch (Exception e) {
} finally {
// 加鎖記得刪除鎖
redisTemplate.delete(lockKey);
}
return "success";
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
但是這種情況可能出現業務代碼 10 秒還沒有執行完,redis 鎖就失效了, 甚至可能導致鎖的永久失效。
比如 線程 Thread1 拿到鎖,執行了10沒有執行完,這個時候鎖失效了。
線程 Thread2 拿到鎖,在執行到 15 秒時,Thread1 將鎖刪掉了,這個時候 Thread3 過來又拿到了鎖,從而可能導致鎖的永久失效。
那又該如何解決?
解決方法:
自己刪除自己的鎖:
但是這種方式還是會導致會可能有兩個線程持有鎖,那么該如何保證只有一個線程持有鎖呢?
解決方案:
在后台開啟一個線程,給鎖續期。Redisson 實現了給鎖續期。
Redisson 實現原理
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.5.0</version>
</dependency>
- 1
- 2
- 3
- 4
- 5
config
@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.0.0.1:6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
- 1
- 2
- 3
- 4
- 5
- 6
修改 controller
其實 redisson 默認實現的續期的原理,就相當於使用
Boolean lock = redisTemplate.opsForValue().setIfAbsent(lockKey, "lockValue", 10, TimeUnit.SECONDS);
redisTemplate.expire(lockKey, 10, TimeUnit.SECONDS); // 不能保證原子性
- 1
- 2
還有一種情況,線程Thread1獲得鎖,開始執行業務邏輯,這個時候 redis 的master 突然宕機,但是還沒有同步到slave,這個時候主從集群從 slave 節點中重新選擇master節點,但是slave中並沒有 Thread1 的鎖,這個時候 Thread2 來,便可能獲得鎖。
解決方法:
保證鎖在 master 及 slave 均存在的情況下,才能加鎖成功。
如果你有什么好的實現方式,歡迎留言探討。