一、分布式鎖
分布式鎖一般有三種實現方式:1. 數據庫樂觀鎖;2. 基於Redis的分布式鎖;3. 基於ZooKeeper的分布式鎖。本篇博客將介紹第二種方式,基於Redis實現分布式鎖。
二、分布式鎖的演進
業務:電商網站賣東西需要去減庫存,本篇文章假設下的訂單數量都為1;
第1版的代碼:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
//獲取redis中的庫存
int stock = Integer.valueOf(valueOperations.get("stock"));
if(stock > 0) {
int newStock = stock - 1;
valueOperations.set("stock", newStock + "");
System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
}
else {
System.out.println("庫存已經為0,不能繼續扣減");
}
return "success";
}
}
以上代碼在高並發的場景下會產生超賣的問題,所以我們修改一下代碼(增加synchronized);
第2版代碼:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
ValueOperations<String, String> valueOperations = redisTemplate.opsForValue();
synchronized (this) {
//獲取redis中的庫存
int stock = Integer.valueOf(valueOperations.get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//減庫存
valueOperations.set("stock", newStock + "");
System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
}
return "success";
}
}
以上代碼在服務為多實例的情況下,還是會出現超賣的問題,這個時候就要引入分布式鎖來解決了。
第3版代碼:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
String lockKey = "lockKey";
//加鎖: setnx
Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, "1");
if(null == isSuccess || isSuccess) {
System.out.println("服務器繁忙, 請稍后重試");
return "error";
}
//------ 執行業務邏輯 ----start------
int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//執行業務操作減庫存
redisTemplate.opsForValue().set("stock", newStock + "");
System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
//------ 執行業務邏輯 ----end------
//釋放鎖
redisTemplate.delete(lockKey);
return "success";
}
}
以上代碼的問題:
(1)若在執行業務邏輯的過程中出現了異常,則會造成鎖不會被釋放,使其他有關的線程全部阻塞住(死鎖);我們可以把鎖釋放操作放入到 finally 語句中來解決;
(2)若在執行業務邏輯的過程中服務給掛掉了,仍然會造成鎖不會被釋放,使其他有關的線程全部阻塞住(死鎖);我們可以給 redis 的 key 增加一個超時時間(超過指定的時間則會刪除key及其對應的數據),雖然在超時時間到達之前其他有關的線程會一直阻塞住,但是這個時間比較小,且可以解決死鎖的問題,所以這個解決方案也是可以接受的。代碼如下:
第4版代碼:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
String lockKey = "lockKey";
try {
//加鎖: setnx,expire(10秒超時)
Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
if(null == isSuccess || isSuccess) {
System.out.println("服務器繁忙, 請稍后重試");
return "error";
}
//------ 執行業務邏輯 ----start------
int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//執行業務操作減庫存
redisTemplate.opsForValue().set("stock", newStock + "");
System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
//------ 執行業務邏輯 ----end------
} finally {
//釋放鎖 redisTemplate.delete(lockKey);
}
return "success";
}
}
以上代碼還是會出現問題:
當線程1的業務執行到一半的時候,設置的鎖超時時間到了,則鎖的key會被刪除;線程2就加鎖成功了,線程2還在執行的時候,線程1的業務執行完了,線程1接着執行刪除鎖的操作,但是線程1刪除的鎖實際上是線程2加的鎖,導致鎖失效的問題。
方法一:可以使用 “只要自己加鎖,只能自己去釋放” 來解決這個問題(第5版代碼);
第5版代碼:
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
public String deduceStock() {
String lockKey = "lockKey";
String clientId = UUID.randomUUID().toString();
try {
//加鎖: setnx,expire Boolean isSuccess = redisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 10, TimeUnit.SECONDS);
if(null == isSuccess || isSuccess) {
System.out.println("服務器繁忙, 請稍后重試");
return "error";
}
//------ 執行業務邏輯 ----start------
int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//執行業務操作減庫存
redisTemplate.opsForValue().set("stock", newStock + "");
System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
//------ 執行業務邏輯 ----end------
} finally {
if(clientId.equals(redisTemplate.opsForValue().get(lockKey))) { //釋放鎖 redisTemplate.delete(lockKey); }
}
return "success";
}
}
以上代碼雖然解決了鎖被其他線程釋放的問題,但是還是會出現問題;當前線程的業務還沒有執行完,鎖的超時時間到了,這樣其他線程就可以去加鎖並執行業務邏輯了,這樣就有兩個線程都在執行了,有可能導致bug。
方法二:可以給鎖進行續命,每次鎖快超時的時候就給鎖重新在設置一個時間(引入另一個redis的java客戶端 Redisson)
三、分布式鎖的Redisson實現
(1)引入maven坐標;
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.13.4</version>
</dependency>
(2)增加配置文件,將Redisson注入到容器中;
@Configuration
public class RedissonConfig {
@Bean
public Redisson redisson() {
Config config = new Config();
//單機版
//config.useSingleServer().setAddress("redis://192.168.1.1:8001").setDatabase(0);
//集群版
config.useClusterServers()
.addNodeAddress("redis://192.168.1.1:8001")
.addNodeAddress("redis://192.168.1.1:8002")
.addNodeAddress("redis://192.168.1.2:8001")
.addNodeAddress("redis://192.168.1.2:8002")
.addNodeAddress("redis://192.168.1.3:8001")
.addNodeAddress("redis://192.168.1.3:8002");
return (Redisson) Redisson.create(config);
}
}
(3)分布式鎖的實現
@Service
public class RedisLockDemo {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired private Redisson redisson;
public String deduceStock() {
String lockKey = "lockKey";
RLock redissonLock = redisson.getLock(lockKey);
try {
//加鎖(超時默認30s), 實現鎖續命的功能(后台啟動一個timer, 默認每10s檢測一次是否持有鎖) redissonLock.lock();
//------ 執行業務邏輯 ----start------
int stock = Integer.valueOf(redisTemplate.opsForValue().get("stock"));
if (stock > 0) {
int newStock = stock - 1;
//執行業務操作減庫存
redisTemplate.opsForValue().set("stock", newStock + "");
System.out.println("扣減庫存成功, 剩余庫存:" + newStock);
} else {
System.out.println("庫存已經為0,不能繼續扣減");
}
//------ 執行業務邏輯 ----end------
} finally {
//解鎖 redissonLock.unlock();
}
return "success";
}
}
Redisson的實現原理

RedissonLock的使用介紹
Redisson的官網:https://redisson.org/
// 鎖默認有效時間30秒,每10秒去檢查並重新設置超時時間
void lock();
// 超過鎖有效時間 leaseTime,就會釋放鎖
void lock(long leaseTime, TimeUnit unit);
// 嘗試獲取鎖;成功則返回true,失敗則返回false
boolean tryLock();
// 不會去啟動定時任務;在 time 時間內還沒有獲取到鎖,則返回false
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 不會去啟動定時任務;當 waitTime 的時間到了,還沒有獲取到鎖則返回false;若獲取到鎖了,鎖的有效時間設置為 leaseTime
boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException;
Jedis和Redisson的比較
Jedis提供了比Redisson更豐富的操作;
Redisson底層多使用 lua 腳本實現,對原子性的操作封裝較好,尤其是在分布式鎖上的封裝;
Redis實現的分布式鎖還會出現一點問題:
線程1加了鎖去執行業務了,此時Redis的 master 掛掉了,還沒有將數據同步到 slave 上。因為集群會選舉一個新的 master 出來,但是新的 master 上並沒有這個鎖;線程2可以在新選舉產生的 master 上去加鎖,然后處理業務。
(1)針對以上問題,我們可以使用 zookeeper 去實現分布式鎖,因為它是強一致性的。但是zookeeper的性能是低於Redis,使用Redis是完全夠了。
(2)當然,對於以上的問題,我們也可以使用 RedLock 去解決Redis上的那個問題,RedLock 實現的原理:給多個Redis節點發送加鎖的消息,只有超過一半以上的節點加鎖成功才算加鎖成功。
但是不推薦使用RedLock,當前的 RedLock 是有bug的,它的實現原理和 zookeeper 是差不多的。
高並發的高性能的Redis
怎么在高並發的場景去實現一個高性能的分布式鎖呢?
電商網站在大促的時候並發量很大:
(1)若搶購不是同一個商品,則可以增加Redis集群的cluster來實現,因為不是同一個商品,所以通過計算 key 的hash會落到不同的 cluster上;
(2)若搶購的是同一個商品,則計算key的hash值會落同一個cluster上,所以加機器也是么有用的。
我們可以使用庫存分段鎖的方式去實現。
分段鎖
假如產品1有200個庫存,我們可以將這200個庫存分為10個段存儲(每段20個),每段存儲到一個cluster上;將key使用hash計算,使這些key最后落在不同的cluster上。
每個下單請求鎖了一個庫存分段,然后在業務邏輯里面,就對數據庫或者是Redis中的那個分段庫存進行操作即可,包括查庫存 -> 判斷庫存是否充足 -> 扣減庫存。
可以參照 ConcurrentHashMap 的源碼去實現,它使用的就是分段鎖。
高性能分布式鎖參考鏈接:https://blog.csdn.net/eluanshi12/article/details/84616173
