1、為什么要使用分布式鎖
如果在一個分布式系統中,我們從數據庫中讀取一個數據,然后修改保存,這種情況很容易遇到並發問題。因為讀取和更新保存不是一個原子操作,在並發時就會導致數據的不正確。這種場景其實並不少見,比如電商秒殺活動,庫存數量的更新就會遇到。如果是單機應用,直接使用本地鎖就可以避免。如果是分布式應用,本地鎖派不上用場,這時就需要引入分布式鎖來解決。
由此可見分布式鎖的目的其實很簡單,就是為了保證多台服務器在執行某一段代碼時保證只有一台服務器執行。
2、為了保證分布式鎖的可用性,至少要確保鎖的實現要同時滿足以下幾點:
- 互斥性。在任何時刻,保證只有一個客戶端持有鎖。
- 不能出現死鎖。如果在一個客戶端持有鎖的期間,這個客戶端崩潰了,也要保證后續的其他客戶端可以上鎖。
- 保證上鎖和解鎖都是同一個客戶端。
3、一般來說,實現分布式鎖的方式有以下幾種:
- 使用MySQL,基於唯一索引。
- 使用ZooKeeper,基於臨時有序節點。
- 使用Redis,基於set命令(2.6.12 版本開始)。
本篇文章主要講解Redis的實現方式。
4、用到的redis命令
鎖的實現主要基於redis的SET命令(SET詳細解釋參考這里),我們來看SET的解釋:
SET key value [EX seconds] [PX milliseconds] [NX|XX]
- 將字符串值 value 關聯到 key 。
- 如果 key 已經持有其他值, SET 就覆寫舊值,無視類型。
- 對於某個原本帶有生存時間(TTL)的鍵來說, 當 SET 命令成功在這個鍵上執行時, 這個鍵原有的 TTL 將被清除。
可選參數
從 Redis 2.6.12 版本開始, SET 命令的行為可以通過一系列參數來修改:
EX second :設置鍵的過期時間為 second 秒。 SET key value EX second 效果等同於 SETEX key second value 。
PX millisecond :設置鍵的過期時間為 millisecond 毫秒。 SET key value PX millisecond 效果等同於 PSETEX key millisecond value 。
NX :只在鍵不存在時,才對鍵進行設置操作。 SET key value NX 效果等同於 SETNX key value 。
XX :只在鍵已經存在時,才對鍵進行設置操作。
加鎖:使用SET key value [PX milliseconds] [NX]命令,如果key不存在,設置value,並設置過期時間(加鎖成功)。如果已經存在lock(也就是有客戶端持有鎖了),則設置失敗(加鎖失敗)。
解鎖:使用del命令,通過刪除鍵值釋放鎖。釋放鎖之后,其他客戶端可以通過set命令進行加鎖。
5、上面第二項,說了分布式鎖,要考慮的問題,下面講解一下
5.1、互斥性。在任何時刻,保證只有一個客戶端持有鎖
redis命令是原子性的,只要客戶端調用redis的命令SET key value [PX milliseconds] [NX] 執行成功,就算加鎖成功了
5.2、不能出現死鎖。如果在一個客戶端持有鎖的期間,這個客戶端崩潰了,也要保證后續的其他客戶端可以上鎖。
set命令px設置了過期時間,key過期失效了,就能避免死鎖了
5.3保證上鎖和解鎖都是同一個客戶端。
釋放鎖(刪除key)的時候,只要確保是當前客戶端設置的value才去刪除key即可,采用lua腳本來實現
在Redis中,執行Lua語言是原子性,也就是說Redis執行Lua的時候是不會被中斷的,具備原子性,這個特性有助於Redis對並發數據一致性的支持。
6、java代碼實現
先把需要的jar包引入
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>2.9.3</version>
</dependency>
加鎖設置參數的實體類
import lombok.Data;
//加鎖設置的參數
@Data
public class LockParam {
//鎖的key
private String lockKey;
//嘗試獲得鎖的時間(單位:毫秒),默認值:3000毫秒
private Long tryLockTime;
//嘗試獲得鎖后,持有鎖的時間(單位:毫秒),默認值:5000毫秒
private Long holdLockTime;
public LockParam(String lockKey){
this(lockKey,1000*3L,1000*5L);
};
public LockParam(String lockKey,Long tryLockTime){
this(lockKey,tryLockTime,1000*5L);
};
public LockParam(String lockKey,Long tryLockTime,Long holdLockTime){
this.lockKey = lockKey;
this.tryLockTime = tryLockTime;
this.holdLockTime = holdLockTime;
};
}
redis分布式具體代碼實現
import lombok.extern.slf4j.Slf4j;
import redis.clients.jedis.Jedis;
import java.util.Collections;
import java.util.UUID;
/**
* redis分布式鎖
*/
@Slf4j
public class RedisLock {
//鎖key的前綴
private final static String prefix_key = "redisLock:";
//釋放鎖的lua腳本
private final static String unLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
//執行unLockScript腳本,釋放鎖成功值
private final static Long unLockSuccess = 1L;
//加鎖設置的參數(key值、超時時間、持有鎖的時間)
private LockParam lockParam;
//嘗試獲得鎖的截止時間【lockParam.getTryLockTime()+System.currentTimeMillis()】
private Long tryLockEndTime;
//redis加鎖的key
private String redisLockKey;
//redis加鎖的vlaus
private String redisLockValue;
//redis加鎖的成功標示
private Boolean holdLockSuccess= Boolean.FALSE;
//jedis實例
private Jedis jedis;
//獲取jedis實例
private Jedis getJedis(){
return this.jedis;
}
//關閉jedis
private void closeJedis(Jedis jedis){
jedis.close();
jedis = null;
}
public RedisLock(LockParam lockParam){
if(lockParam==null){
new RuntimeException("lockParam is null");
}
if(lockParam.getLockKey()==null || lockParam.getLockKey().trim().length()==0){
new RuntimeException("lockParam lockKey is error");
}
this.lockParam = lockParam;
this.tryLockEndTime = lockParam.getTryLockTime()+System.currentTimeMillis();
this.redisLockKey = prefix_key.concat(lockParam.getLockKey());
this.redisLockValue = UUID.randomUUID().toString().replaceAll("-","");
//todo 到時候可以更換獲取Jedis實例的實現
jedis = new Jedis("127.0.0.1",6379);
}
/**
* 加鎖
* @return 成功返回true,失敗返回false
*/
public boolean lock() {
while(true){
//判斷是否超過了,嘗試獲取鎖的時間
if(System.currentTimeMillis()>tryLockEndTime){
return false;
}
//嘗試獲取鎖
holdLockSuccess = tryLock();
if(Boolean.TRUE.equals(holdLockSuccess)){
return true;//獲取鎖成功
}
try {
//獲得鎖失敗,休眠50毫秒再去嘗試獲得鎖,避免一直請求redis,導致redis cpu飆升
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 執行一次加鎖操作:成功返回true 失敗返回false
* @return 成功返回true,失敗返回false
*/
private boolean tryLock() {
try {
String result = getJedis().set(redisLockKey,redisLockValue, "NX", "PX", lockParam.getHoldLockTime());
if ("OK".equals(result)) {
return true;
}
}catch (Exception e){
log.warn("tryLock failure redisLockKey:{} redisLockValue:{} lockParam:{}",redisLockKey,redisLockValue,lockParam,e);
}
return false;
}
/**
* 解鎖
* @return 成功返回true,失敗返回false
*/
public Boolean unlock() {
Object result = null;
try {
//獲得鎖成功,才執行lua腳本
if(Boolean.TRUE.equals(holdLockSuccess)){
//執行Lua腳本
result = getJedis().eval(unLockScript, Collections.singletonList(redisLockKey), Collections.singletonList(redisLockValue));
if (unLockSuccess.equals(result)) {//釋放成功
return true;
}
}
} catch (Exception e) {
log.warn("unlock failure redisLockKey:{} redisLockValue:{} lockParam:{} result:{}",redisLockKey,redisLockValue,lockParam,result,e);
} finally {
this.closeJedis(jedis);
}
return false;
}
}
redis分布式鎖使用
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class test {
static String lockKey = "666";
public static void main(String[] args) throws InterruptedException {
log.info("下面測試兩個線程同時,搶占鎖的結果");
Thread thread1 = new Thread(()->{
testRedisLock();
});
thread1.setName("我是線程1");
Thread thread2 = new Thread(()->{
testRedisLock();
});
thread2.setName("我是線程2");
//同時啟動線程
thread1.start();
thread2.start();
Thread.sleep(1000*20);
log.info("-----------------我是一條分割線----------------");
log.info("");
log.info("");
log.info("");
log.info("下面是測試 一個線程獲取鎖成功后,由於業務執行時間超過了設置持有鎖的時間,是否會把其他線程持有的鎖給釋放掉");
Thread thread3 = new Thread(()->{
testRedisLock2();
});
thread3.setName("我是線程3");
thread3.start();
Thread.sleep(1000*1);//暫停一秒是為了讓線程3獲的到鎖
Thread thread4 = new Thread(()->{
testRedisLock();
});
thread4.setName("我是線程4");
thread4.start();
}
public static void testRedisLock(){
LockParam lockParam = new LockParam(lockKey);
lockParam.setTryLockTime(2000L);//2秒時間嘗試獲得鎖
lockParam.setHoldLockTime(1000*10L);//獲得鎖成功后持有鎖10秒時間
RedisLock redisLock = new RedisLock(lockParam);
try {
Boolean lockFlag = redisLock.lock();
log.info("加鎖結果:{}",lockFlag);
if(lockFlag){
try {
//20秒模擬處理業務代碼時間
Thread.sleep(1000*5L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}catch (Exception e) {
log.info("testRedisLock e---->",e);
}finally {
boolean unlockResp = redisLock.unlock();
log.info("釋放鎖結果:{}",unlockResp);
}
}
public static void testRedisLock2(){
LockParam lockParam = new LockParam(lockKey);
lockParam.setTryLockTime(1000*2L);//2秒時間嘗試獲得鎖
lockParam.setHoldLockTime(1000*2L);//獲得鎖成功后持有鎖2秒時間
RedisLock redisLock = new RedisLock(lockParam);
try {
Boolean lockFlag = redisLock.lock();
log.info("加鎖結果:{}",lockFlag);
if(lockFlag){
try {
//10秒模擬處理業務代碼時間
Thread.sleep(1000*10L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}catch (Exception e) {
log.info("testRedisLock e---->",e);
}finally {
boolean unlockResp = redisLock.unlock();
log.info("釋放鎖結果:{}",unlockResp);
}
}
}
這是代碼在執行過程中,通過redis可視化工具看到的效果,可以參考一下~
控制台日志打印結果
15:02:28.569 [main] INFO com.test.test - 下面測試兩個線程同時,搶占鎖的結果
15:02:28.645 [我是線程2] INFO com.test.test - 加鎖結果:true
15:02:30.618 [我是線程1] INFO com.test.test - 加鎖結果:false
15:02:30.620 [我是線程1] INFO com.test.test - 釋放鎖結果:false
15:02:33.652 [我是線程2] INFO com.test.test - 釋放鎖結果:true
15:02:48.614 [main] INFO com.test.test - -----------------我是一條分割線----------------
15:02:48.614 [main] INFO com.test.test -
15:02:48.614 [main] INFO com.test.test -
15:02:48.614 [main] INFO com.test.test -
15:02:48.614 [main] INFO com.test.test - 下面是測試 一個線程獲取鎖成功后,由於業務執行時間超過了設置持有鎖的時間,是否會把其他線程持有的鎖給釋放掉
15:02:48.616 [我是線程3] INFO com.test.test - 加鎖結果:true
15:02:50.645 [我是線程4] INFO com.test.test - 加鎖結果:true
15:02:55.647 [我是線程4] INFO com.test.test - 釋放鎖結果:true
15:02:58.621 [我是線程3] INFO com.test.test - 釋放鎖結果:false
- 可以看到多個線程競爭一把鎖的時候,保證了只有一個線程持有鎖
- 分割線下面的日志也能看出,一個線程持有了鎖,由於處理業務代碼時間,超過了設置持有鎖的時間,通過lua腳本釋放鎖的時候,也不會把其他線程持有的鎖給釋放掉,保證了安全釋放了鎖
7、分布式鎖 實際使用中需要注意的一些問題
假設有這樣一個場景: 有一個修改訂單狀態的接口,訂單狀態修改為失敗,就不允許在修改為其他狀態了;
在單台機器上,在代碼方法上加了synchronized來做並發控制,由於代碼邏輯比較復雜,現在它的TPS是1,一秒就只能處理一個訂單。
后面對這個系統做集群,部署了一百台,那么這個接口性能就提升了100倍了。
但是synchronized是進程級別的鎖,在集群環境下synchronized沒辦法控制其他服務器下線程並發訪問 臨界代碼了,后面就采用了分布式鎖來做並發控制。
7.1、那么使用分布鎖要注意什么了?
7.1.1、鎖粒度
如果分布式鎖的key 設置的是 redisLock:updateOrderStatus 相當於集群下對這個接口加了相同的一把大鎖,按照上面那個場景TPS就變成1了,集群部署就浪費了。
7.1.2、那么如何控制鎖粒度了?
平常我們修改訂單的時候都有訂單號,那么分布式的key可以設置為:redisLock:updateOrderStatus:{orderCode} ,{orderCode}執行的時候動態的替換成訂單編號,那么鎖粒度就控制到這條訂單了,就跟數據庫從表鎖 變成了行鎖一樣,接口支持更高的並發了。
7.1.3、獲取鎖時間
如果時間設置的太長:用戶就會等待太久才能得到響應結果
太短:頻繁獲取鎖失敗,用戶體驗性也不好
只能按照不同的業務,由開發人員來衡量設置多長的時間
7.1.4、持有鎖時間:
如果鎖粒度比較小,時間可以設置長一點,就算 業務代碼較復雜 執行比較耗時,對客戶的影響也較小 比較容易可以接受
7.1.5、難道每次想使用分布式鎖的時候都需要下面流程一樣,在編碼一次?有什么辦法能優化嗎?
1、先創建一個 分布式鎖對象;RedisLock redisLock = new RedisLock(lockParam);
2、加鎖;Boolean lockFlag = redisLock.lock();
3、finally 解鎖;redisLock.unlock();
分布式鎖使用的加鎖、解鎖 流程是固定的,沒辦法改變了;
這個流程是不是跟spring的 編程式事務一樣,spring 有編程式事務,也有聲明式事務
那么我們參考這個聲明式事務實現一個 聲明式的分布式鎖。
那聲明式的分布式鎖 優點就是:
聲明式分布式鎖:可知編程式分布式鎖每次實現都要單獨實現,但業務量大功能復雜時,使用編程式分布式鎖無疑是會增加一部分開發量,而聲明式分布式鎖不同,聲明式分布式鎖屬於無侵入式,不會影響業務邏輯的實現。
我們工作中開發 可以借助 spring aop + 自定義注解 來實現聲明式分布式鎖,在使用過程中,也需要考慮 spring aop失效的場景,避免業務 加了自定義注解后 分布式鎖沒生效的情況;