分布式鎖概覽
在多線程的環境下,為了保證一個代碼塊在同一時間只能由一個線程訪問,Java中我們一般可以使用synchronized語法和ReetrantLock去保證,這實際上是本地鎖的方式。但是現在公司都是流行分布式架構,在分布式環境下,如何保證不同節點的線程同步執行呢?
實際上,對於分布式場景,我們可以使用分布式鎖,它是控制分布式系統之間互斥訪問共享資源的一種方式。
比如說在一個分布式系統中,多台機器上部署了多個服務,當客戶端一個用戶發起一個數據插入請求時,如果沒有分布式鎖機制保證,那么那多台機器上的多個服務可能進行並發插入操作,導致數據重復插入,對於某些不允許有多余數據的業務來說,這就會造成問題。而分布式鎖機制就是為了解決類似這類問題,保證多個服務之間互斥的訪問共享資源,如果一個服務搶占了分布式鎖,其他服務沒獲取到鎖,就不進行后續操作。大致意思如下圖所示(不一定准確):
分布式鎖的特點
分布式鎖一般有如下的特點:
- 互斥性: 同一時刻只能有一個線程持有鎖
- 可重入性: 同一節點上的同一個線程如果獲取了鎖之后能夠再次獲取鎖
- 鎖超時:和J.U.C中的鎖一樣支持鎖超時,防止死鎖
- 高性能和高可用: 加鎖和解鎖需要高效,同時也需要保證高可用,防止分布式鎖失效
- 具備阻塞和非阻塞性:能夠及時從阻塞狀態中被喚醒
分布式鎖的實現方式
我們一般實現分布式鎖有以下幾種方式:
- 基於數據庫
- 基於Redis
- 基於zookeeper
本篇文章主要介紹基於Redis如何實現分布式鎖
Redis的分布式鎖實現
1. 利用setnx+expire命令 (錯誤的做法)
Redis的SETNX命令,setnx key value,將key設置為value,當鍵不存在時,才能成功,若鍵存在,什么也不做,成功返回1,失敗返回0 。 SETNX實際上就是SET IF NOT Exists的縮寫
因為分布式鎖還需要超時機制,所以我們利用expire命令來設置,所以利用setnx+expire命令的核心代碼如下:
public boolean tryLock(String key,String requset,int timeout) {
Long result = jedis.setnx(key, requset);
// result = 1時,設置成功,否則設置失敗
if (result == 1L) {
return jedis.expire(key, timeout) == 1L;
} else {
return false;
}
}
實際上上面的步驟是有問題的,setnx和expire是分開的兩步操作,不具有原子性,如果執行完第一條指令應用異常或者重啟了,鎖將無法過期。
一種改善方案就是使用Lua腳本來保證原子性(包含setnx和expire兩條指令)
2. 使用Lua腳本(包含setnx和expire兩條指令)
public boolean tryLock_with_lua(String key, String UniqueId, int seconds) {
String lua_scripts = "if redis.call('setnx',KEYS[1],ARGV[1]) == 1 then" +
"redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end";
List<String> keys = new ArrayList<>();
List<String> values = new ArrayList<>();
keys.add(key);
values.add(UniqueId);
values.add(String.valueOf(seconds));
Object result = jedis.eval(lua_scripts, keys, values);
//判斷是否成功
return result.equals(1L);
}
3. 使用 set key value [EX seconds][PX milliseconds][NX|XX] 命令 (正確做法)
Redis在 2.6.12 版本開始,為 SET 命令增加一系列選項:
SET key value[EX seconds][PX milliseconds][NX|XX]
- EX seconds: 設定過期時間,單位為秒
- PX milliseconds: 設定過期時間,單位為毫秒
- NX: 僅當key不存在時設置值
- XX: 僅當key存在時設置值
set命令的nx選項,就等同於setnx命令,代碼過程如下:
public boolean tryLock_with_set(String key, String UniqueId, int seconds) {
return "OK".equals(jedis.set(key, UniqueId, "NX", "EX", seconds));
}
value必須要具有唯一性,我們可以用UUID來做,設置隨機字符串保證唯一性,至於為什么要保證唯一性?假如value不是隨機字符串,而是一個固定值,那么就可能存在下面的問題:
- 1.客戶端1獲取鎖成功
- 2.客戶端1在某個操作上阻塞了太長時間
- 3.設置的key過期了,鎖自動釋放了
- 4.客戶端2獲取到了對應同一個資源的鎖
- 5.客戶端1從阻塞中恢復過來,因為value值一樣,所以執行釋放鎖操作時就會釋放掉客戶端2持有的鎖,這樣就會造成問題
所以通常來說,在釋放鎖時,我們需要對value進行驗證
釋放鎖的實現
釋放鎖時需要驗證value值,也就是說我們在獲取鎖的時候需要設置一個value,不能直接用del key這種粗暴的方式,因為直接del key任何客戶端都可以進行解鎖了,所以解鎖時,我們需要判斷鎖是否是自己的,基於value值來判斷,代碼如下:
public boolean releaseLock_with_lua(String key,String value) {
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then " +
"return redis.call('del',KEYS[1]) else return 0 end";
return jedis.eval(luaScript, Collections.singletonList(key), Collections.singletonList(value)).equals(1L);
}
這里使用Lua腳本的方式,盡量保證原子性。
使用 set key value [EX seconds][PX milliseconds][NX|XX]
命令 看上去很OK,實際上在Redis集群的時候也會出現問題,比如說A客戶端在Redis的master節點上拿到了鎖,但是這個加鎖的key還沒有同步到slave節點,master故障,發生故障轉移,一個slave節點升級為master節點,B客戶端也可以獲取同個key的鎖,但客戶端A也已經拿到鎖了,這就導致多個客戶端都拿到鎖。
所以針對Redis集群這種情況,還有其他方案
4. Redlock算法 與 Redisson 實現
Redis作者 antirez基於分布式環境下提出了一種更高級的分布式鎖的實現Redlock,原理如下:
下面參考文章Redlock:Redis分布式鎖最牛逼的實現 和 redis.io/topics/dist…
關於Redlock算法的實現,在Redisson中我們可以使用RedissonRedLock來完成,具體使用細節可以參考大佬的文章: mp.weixin.qq.com/s/8uhYult2h…
普通實現
說道Redis分布式鎖大部分人都會想到:setnx+lua
,或者知道set key value px milliseconds nx
。后一種方式的核心實現命令如下:
- 獲取鎖(unique_value可以是UUID等)
SET resource_name unique_value NX PX 30000
- 釋放鎖(lua腳本中,一定要比較value,防止誤解鎖)
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
這種實現方式有3大要點(也是面試概率非常高的地方):
- set命令要用
set key value px milliseconds nx
; - value要具有唯一性;
- 釋放鎖時要驗證value值,不能誤解鎖;
事實上這類瑣最大的缺點就是它加鎖時只作用在一個Redis節點上,即使Redis通過sentinel保證高可用,如果這個master節點由於某些原因發生了主從切換,那么就會出現鎖丟失的情況:
- 在Redis的master節點上拿到了鎖;
- 但是這個加鎖的key還沒有同步到slave節點;
- master故障,發生故障轉移,slave節點升級為master節點;
- 導致鎖丟失。
正因為如此,Redis作者antirez基於分布式環境下提出了一種更高級的分布式鎖的實現方式:Redlock。筆者認為,Redlock也是Redis所有分布式鎖實現方式中唯一能讓面試官高潮的方式。
Redlock實現
antirez提出的redlock算法大概是這樣的:
在Redis的分布式環境中,我們假設有N個Redis master。這些節點完全互相獨立,不存在主從復制或者其他集群協調機制。我們確保將在N個實例上使用與在Redis單實例下相同方法獲取和釋放鎖。現在我們假設有5個Redis master節點,同時我們需要在5台服務器上面運行這些Redis實例,這樣保證他們不會同時都宕掉。
為了取到鎖,客戶端應該執行以下操作:
- 獲取當前Unix時間,以毫秒為單位。
- 依次嘗試從5個實例,使用相同的key和具有唯一性的value(例如UUID)獲取鎖。當向Redis請求獲取鎖時,客戶端應該設置一個網絡連接和響應超時時間,這個超時時間應該小於鎖的失效時間。例如你的鎖自動失效時間為10秒,則超時時間應該在5-50毫秒之間。這樣可以避免服務器端Redis已經掛掉的情況下,客戶端還在死死地等待響應結果。如果服務器端沒有在規定時間內響應,客戶端應該盡快嘗試去另外一個Redis實例請求獲取鎖。
- 客戶端使用當前時間減去開始獲取鎖時間(步驟1記錄的時間)就得到獲取鎖使用的時間。當且僅當從大多數(N/2+1,這里是3個節點)的Redis節點都取到鎖,並且使用的時間小於鎖失效時間時,鎖才算獲取成功。
- 如果取到了鎖,key的真正有效時間等於有效時間減去獲取鎖所使用的時間(步驟3計算的結果)。
- 如果因為某些原因,獲取鎖失敗(沒有在至少N/2+1個Redis實例取到鎖或者取鎖時間已經超過了有效時間),客戶端應該在所有的Redis實例上進行解鎖(即便某些Redis實例根本就沒有加鎖成功,防止某些節點獲取到鎖但是客戶端沒有得到響應而導致接下來的一段時間不能被重新獲取鎖)。
Redlock源碼
redisson已經有對redlock算法封裝,接下來對其用法進行簡單介紹,並對核心源碼進行分析(假設5個redis實例)。
-
POM依賴
<!-- https://mvnrepository.com/artifact/org.redisson/redisson --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.3.2</version> </dependency>
用法
首先,我們來看一下redission封裝的redlock算法實現的分布式鎖用法,非常簡單,跟重入鎖(ReentrantLock)有點類似:
Config config = new Config();
config.useSentinelServers().addSentinelAddress("127.0.0.1:6369","127.0.0.1:6379", "127.0.0.1:6389")
.setMasterName("masterName")
.setPassword("password").setDatabase(0);
RedissonClient redissonClient = Redisson.create(config);
// 還可以getFairLock(), getReadWriteLock()
RLock redLock = redissonClient.getLock("REDLOCK_KEY");
boolean isLock;
try {
isLock = redLock.tryLock();
// 500ms拿不到鎖, 就認為獲取鎖失敗。10000ms即10s是鎖失效時間。
isLock = redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS);
if (isLock) {
//TODO if get lock success, do something;
}
} catch (Exception e) {
} finally {
// 無論如何, 最后都要解鎖
redLock.unlock();
}
唯一ID
實現分布式鎖的一個非常重要的點就是set的value要具有唯一性,redisson的value是怎樣保證value的唯一性呢?答案是UUID+threadId。入口在redissonClient.getLock("REDLOCK_KEY"),源碼在Redisson.java和RedissonLock.java中:
protected final UUID id = UUID.randomUUID();
String getLockName(long threadId) {
return id + ":" + threadId;
}
獲取鎖
獲取鎖的代碼為redLock.tryLock()或者redLock.tryLock(500, 10000, TimeUnit.MILLISECONDS),兩者的最終核心源碼都是下面這段代碼,只不過前者獲取鎖的默認租約時間(leaseTime)是LOCK_EXPIRATION_INTERVAL_SECONDS,即30s:
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
internalLockLeaseTime = unit.toMillis(leaseTime);
// 獲取鎖時向5個redis實例發送的命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
// 首先分布式鎖的KEY不能存在,如果確實不存在,那么執行hset命令(hset REDLOCK_KEY uuid+threadId 1),並通過pexpire設置失效時間(也是鎖的租約時間)
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 如果分布式鎖的KEY已經存在,並且value也匹配,表示是當前線程持有的鎖,那么重入次數加1,並且設置失效時間
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
// 獲取分布式鎖的KEY的失效時間毫秒數
"return redis.call('pttl', KEYS[1]);",
// 這三個參數分別對應KEYS[1],ARGV[1]和ARGV[2]
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}
獲取鎖的命令中,
- KEYS[1]就是Collections.singletonList(getName()),表示分布式鎖的key,即REDLOCK_KEY;
- ARGV[1]就是internalLockLeaseTime,即鎖的租約時間,默認30s;
- ARGV[2]就是getLockName(threadId),是獲取鎖時set的唯一值,即UUID+threadId:
釋放鎖
釋放鎖的代碼為redLock.unlock(),核心源碼如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
// 向5個redis實例都執行如下命令
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
// 如果分布式鎖KEY不存在,那么向channel發布一條消息
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; " +
"end;" +
// 如果分布式鎖存在,但是value不匹配,表示鎖已經被占用,那么直接返回
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
// 如果就是當前線程占有分布式鎖,那么將重入次數減1
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
// 重入次數減1后的值如果大於0,表示分布式鎖有重入過,那么只設置失效時間,還不能刪除
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
// 重入次數減1后的值如果為0,表示分布式鎖只獲取過1次,那么刪除這個KEY,並發布解鎖消息
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
// 這5個參數分別對應KEYS[1],KEYS[2],ARGV[1],ARGV[2]和ARGV[3]
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.unlockMessage, internalLockLeaseTime, getLockName(threadId));
}