文章很長,而且持續更新,建議收藏起來,慢慢讀!瘋狂創客圈總目錄 博客園版 為您奉上珍貴的學習資源 :
免費贈送 :《尼恩Java面試寶典》 持續更新+ 史上最全 + 面試必備 2000頁+ 面試必備 + 大廠必備 +漲薪必備
免費贈送 經典圖書:《Java高並發核心編程(卷1)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高並發核心編程(卷2)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高並發核心編程(卷3)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《尼恩Java面試寶典 最新版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 資源寶庫: Java 必備 百度網盤資源大合集 價值>10000元 加尼恩領取
下面是與redis面試相關的系列博文,建議大家系統化、系統化的學習
redis cluster 集群 HA 原理和實操(史上最全、面試必備)
跨JVM的線程安全問題
在單體的應用開發場景中,在多線程的環境下,涉及並發同步的時候,為了保證一個代碼塊在同一時間只能由一個線程訪問,我們一般可以使用synchronized語法和ReetrantLock去保證,這實際上是本地鎖的方式。
也就是說,在同一個JVM內部,大家往往采用synchronized或者Lock的方式來解決多線程間的安全問題。但在分布式集群工作的開發場景中,在JVM之間,那么就需要一種更加高級的鎖機制,來處理種跨JVM進程之間的線程安全問題.
解決方案是:使用分布式鎖
總之,對於分布式場景,我們可以使用分布式鎖,它是控制分布式系統之間互斥訪問共享資源的一種方式。
比如說在一個分布式系統中,多台機器上部署了多個服務,當客戶端一個用戶發起一個數據插入請求時,如果沒有分布式鎖機制保證,那么那多台機器上的多個服務可能進行並發插入操作,導致數據重復插入,對於某些不允許有多余數據的業務來說,這就會造成問題。而分布式鎖機制就是為了解決類似這類問題,保證多個服務之間互斥的訪問共享資源,如果一個服務搶占了分布式鎖,其他服務沒獲取到鎖,就不進行后續操作。
大致意思如下圖所示(不一定准確):

何為分布式鎖?
何為分布式鎖?
- 當在分布式模型下,數據只有一份(或有限制),此時需要利用鎖的技術控制某一時刻修改數據的進程數。
- 用一個狀態值表示鎖,對鎖的占用和釋放通過狀態值來標識。
分布式鎖的條件:
- 互斥性。在任意時刻,只有一個客戶端能持有鎖。
- 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。
- 具有容錯性。只要大部分的 Redis 節點正常運行,客戶端就可以加鎖和解鎖。
- 解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。
分布式鎖的實現:
分布式鎖的實現由很多種,文件鎖、數據庫、redis等等,比較多;分布式鎖常見的多種實現方式:
- 數據庫悲觀鎖、
- 數據庫樂觀鎖;
- 基於Redis的分布式鎖;
- 基於ZooKeeper的分布式鎖。
在實踐中,還是redis做分布式鎖性能會高一些
數據庫悲觀鎖
所謂悲觀鎖,悲觀鎖是對數據被的修改持悲觀態度(認為數據在被修改的時候一定會存在並發問題),因此在整個數據處理過程中將數據鎖定。
悲觀鎖的實現,往往依靠數據庫提供的鎖機制(也只有數據庫層提供的鎖機制才能真正保證數據訪問的排他性,否則,即使在應用層中實現了加鎖機制,也無法保證外部系統不會修改數據)。
數據庫的行鎖、表鎖、排他鎖等都是悲觀鎖,這里以行鎖為例,進行介紹。以我們常用的MySQL為例,我們通過使用select...for update語句, 執行該語句后,會在表上加持行鎖,一直到事務提交,解除行鎖。
使用場景舉例:
在秒殺案例中,生成訂單和扣減庫存的操作,可以通過商品記錄的行鎖,進行保護。們通過使用select...for update語句,在查詢商品表庫存時將該條記錄加鎖,待下單減庫存完成后,再釋放鎖。
示例的SQL如下:
//0.開始事務
begin;
//1.查詢出商品信息
select stockCount from seckill_good where id=1 for update;
//2.根據商品信息生成訂單
insert into seckill_order (id,good_id) values (null,1);
//3.修改商品stockCount減一
update seckill_good set stockCount=stockCount-1 where id=1;
//4.提交事務
commit;
以上,在對id = 1的記錄修改前,先通過for update的方式進行加鎖,然后再進行修改。這就是比較典型的悲觀鎖策略。
如果以上修改庫存的代碼發生並發,同一時間只有一個線程可以開啟事務並獲得id=1的鎖,其它的事務必須等本次事務提交之后才能執行。這樣我們可以保證當前的數據不會被其它事務修改。
我們使用select_for_update,另外一定要寫在事務中.
注意:要使用悲觀鎖,我們必須關閉mysql數據庫中自動提交的屬性,命令set autocommit=0;即可關閉,因為MySQL默認使用autocommit模式,也就是說,當你執行一個更新操作后,MySQL會立刻將結果進行提交。
悲觀鎖的實現,往往依靠數據庫提供的鎖機制。在數據庫中,悲觀鎖的流程如下:
- 在對記錄進行修改前,先嘗試為該記錄加上排他鎖(exclusive locking)。
- 如果加鎖失敗,說明該記錄正在被修改,那么當前查詢可能要等待或者拋出異常。具體響應方式由開發者根據實際需要決定。
- 如果成功加鎖,那么就可以對記錄做修改,事務完成后就會解鎖了。
- 其間如果有其他事務對該記錄做加鎖的操作,都要等待當前事務解鎖或直接拋出異常。
數據庫樂觀鎖
使用樂觀鎖就不需要借助數據庫的鎖機制了。
樂觀鎖的概念中其實已經闡述了他的具體實現細節:主要就是兩個步驟:沖突檢測和數據更新。其實現方式有一種比較典型的就是Compare and Swap(CAS)技術。
CAS是項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試。
CAS的實現中,在表中增加一個version字段,操作前先查詢version信息,在數據提交時檢查version字段是否被修改,如果沒有被修改則進行提交,否則認為是過期數據。
比如前面的扣減庫存問題,通過樂觀鎖可以實現如下:
//1.查詢出商品信息
select stockCount, version from seckill_good where id=1;
//2.根據商品信息生成訂單
insert into seckill_order (id,good_id) values (null,1);
//3.修改商品庫存
update seckill_good set stockCount=stockCount-1, version = version+1 where id=1, version=version;
以上,我們在更新之前,先查詢一下庫存表中當前版本(version),然后在做update的時候,以version 作為一個修改條件。
當我們提交更新的時候,判斷數據庫表對應記錄的當前version與第一次取出來的version進行比對,如果數據庫表當前version與第一次取出來的version相等,則予以更新,否則認為是過期數據。
CAS 樂觀鎖有兩個問題:
(1) CAS 存在一個比較重要的問題,即ABA問題. 解決的辦法是version字段順序遞增。
(2) 樂觀鎖的方式,在高並發時,只有一個線程能執行成功,會造成大量的失敗,這給用戶的體驗顯然是很不好的。
Zookeeper分布式鎖
除了在數據庫層面加分布式鎖,通常還可以使用以下更高性能、更高可用的分布式鎖:
- 分布式緩存(如redis)鎖
- 分布式協調(如zookeeper)鎖
有關zookeeper分布式鎖的原理和實現,具體請參見下面的博客:
Zookeeper 分布式鎖 (圖解+秒懂+史上最全)
或者閱讀筆者的《Java高並發核心編程(卷1加強版)》
Redis分布式鎖
本文重點介紹Redis分布式鎖,分為兩個維度進行介紹:
(1)基於Jedis手工造輪子分布式鎖
(2)介紹Redission 分布式鎖的使用和原理。
分布式鎖一般有如下的特點:
- 互斥性: 同一時刻只能有一個線程持有鎖
- 可重入性: 同一節點上的同一個線程如果獲取了鎖之后能夠再次獲取鎖
- 鎖超時:和J.U.C中的鎖一樣支持鎖超時,防止死鎖
- 高性能和高可用: 加鎖和解鎖需要高效,同時也需要保證高可用,防止分布式鎖失效
- 具備阻塞和非阻塞性:能夠及時從阻塞狀態中被喚醒
手工造輪子:基於Jedis 的API實現分布式鎖
我們首先講解 Jedis 普通分布式鎖實現,並且是純手工的模式,從最為基礎的Redis命令開始。
只有充分了解與分布式鎖相關的普通Redis命令,才能更好的了解高級的Redis分布式鎖的實現,因為高級的分布式鎖的實現完全基於普通Redis命令。
Redis幾種架構
Redis發展到現在,幾種常見的部署架構有:
- 單機模式;
- 主從模式;
- 哨兵模式;
- 集群模式;
從分布式鎖的角度來說, 無論是單機模式、主從模式、哨兵模式、集群模式,其原理都是類同的。 只是主從模式、哨兵模式、集群模式的更加的高可用、或者更加高並發。
所以,接下來先基於單機模式,基於Jedis手工造輪子實現自己的分布式鎖。
首先看兩個命令:
Redis分布式鎖機制,主要借助setnx和expire兩個命令完成。
setnx命令:
SETNX 是SET if Not eXists的簡寫。將 key 的值設為 value,當且僅當 key 不存在; 若給定的 key 已經存在,則 SETNX 不做任何動作。
下面為客戶端使用示例:
127.0.0.1:6379> set lock "unlock"
OK
127.0.0.1:6379> setnx lock "unlock"
(integer) 0
127.0.0.1:6379> setnx lock "lock"
(integer) 0
127.0.0.1:6379>
expire命令:
expire命令為 key 設置生存時間,當 key 過期時(生存時間為 0 ),它會被自動刪除. 其格式為:
EXPIRE key seconds
下面為客戶端使用示例:
127.0.0.1:6379> expire lock 10
(integer) 1
127.0.0.1:6379> ttl lock
8
基於Jedis API的分布式鎖的總體流程:
通過Redis的setnx、expire命令可以實現簡單的鎖機制:
- key不存在時創建,並設置value和過期時間,返回值為1;成功獲取到鎖;
- 如key存在時直接返回0,搶鎖失敗;
- 持有鎖的線程釋放鎖時,手動刪除key; 或者過期時間到,key自動刪除,鎖釋放。
線程調用setnx方法成功返回1認為加鎖成功,其他線程要等到當前線程業務操作完成釋放鎖后,才能再次調用setnx加鎖成功。

以上簡單redis分布式鎖的問題:
如果出現了這么一個問題:如果setnx是成功的,但是expire設置失敗,一旦出現了釋放鎖失敗,或者沒有手工釋放,那么這個鎖永遠被占用,其他線程永遠也搶不到鎖。
所以,需要保障setnx和expire兩個操作的原子性,要么全部執行,要么全部不執行,二者不能分開。
解決的辦法有兩種:
- 使用set的命令時,同時設置過期時間,不再單獨使用 expire命令
- 使用lua腳本,將加鎖的命令放在lua腳本中原子性的執行
簡單加鎖:使用set的命令時,同時設置過期時間
使用set的命令時,同時設置過期時間的示例如下:
127.0.0.1:6379> set unlock "234" EX 100 NX
(nil)
127.0.0.1:6379>
127.0.0.1:6379> set test "111" EX 100 NX
OK
這樣就完美的解決了分布式鎖的原子性; set 命令的完整格式:
set key value [EX seconds] [PX milliseconds] [NX|XX]
EX seconds:設置失效時長,單位秒
PX milliseconds:設置失效時長,單位毫秒
NX:key不存在時設置value,成功返回OK,失敗返回(nil)
XX:key存在時設置value,成功返回OK,失敗返回(nil)
使用set命令實現加鎖操作,先展示加鎖的簡單代碼實習,再帶大家慢慢解釋為什么這樣實現。
加鎖的簡單代碼實現
package com.crazymaker.springcloud.standard.lock;
@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {
private RedisTemplate redisTemplate;
private static final String LOCK_SUCCESS = "OK";
private static final String SET_IF_NOT_EXIST = "NX";
private static final String SET_WITH_EXPIRE_TIME = "PX";
/**
* 嘗試獲取分布式鎖
* @param jedis Redis客戶端
* @param lockKey 鎖
* @param requestId 請求標識
* @param expireTime 超期時間
* @return 是否獲取成功
*/
public static boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {
String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
if (LOCK_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
可以看到,我們加鎖用到了Jedis的set Api:
jedis.set(String key, String value, String nxxx, String expx, int time)
這個set()方法一共有五個形參:
-
第一個為key,我們使用key來當鎖,因為key是唯一的。
-
第二個為value,我們傳的是requestId,很多童鞋可能不明白,有key作為鎖不就夠了嗎,為什么還要用到value?原因就是我們在上面講到可靠性時,分布式鎖要滿足第四個條件解鈴還須系鈴人,通過給value賦值為requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據。
requestId可以使用
UUID.randomUUID().toString()方法生成。 -
第三個為nxxx,這個參數我們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,我們進行set操作;若key已經存在,則不做任何操作;
-
第四個為expx,這個參數我們傳的是PX,意思是我們要給這個key加一個過期的設置,具體時間由第五個參數決定。
-
第五個為time,與第四個參數相呼應,代表key的過期時間。
總的來說,執行上面的set()方法就只會導致兩種結果:
- 當前沒有鎖(key不存在),那么就進行加鎖操作,並對鎖設置個有效期,同時value表示加鎖的客戶端。
- 已有鎖存在,不做任何操作。
心細的童鞋就會發現了,我們的加鎖代碼滿足前面描述的四個條件中的三個。
-
首先,set()加入了NX參數,可以保證如果已有key存在,則函數不會調用成功,也就是只有一個客戶端能持有鎖,滿足互斥性。
-
其次,由於我們對鎖設置了過期時間,即使鎖的持有者后續發生崩潰而沒有解鎖,鎖也會因為到了過期時間而自動解鎖(即key被刪除),不會被永遠占用(而發生死鎖)。
-
最后,因為我們將value賦值為requestId,代表加鎖的客戶端請求標識,那么在客戶端在解鎖的時候就可以進行校驗是否是同一個客戶端。
-
由於我們只考慮Redis單機部署的場景,所以容錯性我們暫不考慮。
基於Jedis 的API實現簡單解鎖代碼
還是先展示代碼,再帶大家慢慢解釋為什么這樣實現。
解鎖的簡單代碼實現:
package com.crazymaker.springcloud.standard.lock;
@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {
private static final Long RELEASE_SUCCESS = 1L;
/**
* 釋放分布式鎖
* @param jedis Redis客戶端
* @param lockKey 鎖
* @param requestId 請求標識
* @return 是否釋放成功
*/
public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {
String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));
if (RELEASE_SUCCESS.equals(result)) {
return true;
}
return false;
}
}
那么這段Lua代碼的功能是什么呢?
其實很簡單,首先獲取鎖對應的value值,檢查是否與requestId相等,如果相等則刪除鎖(解鎖)。
第一行代碼,我們寫了一個簡單的Lua腳本代碼。
第二行代碼,我們將Lua代碼傳到jedis.eval()方法里,並使參數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId。eval()方法是將Lua代碼交給Redis服務端執行。
那么為什么要使用Lua語言來實現呢?
因為要確保上述操作是原子性的。那么為什么執行eval()方法可以確保原子性,源於Redis的特性.
簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,並且直到eval命令執行完成,Redis才會執行其他命
錯誤示例1
最常見的解鎖代碼就是直接使用 jedis.del() 方法刪除鎖,這種不先判斷鎖的擁有者而直接解鎖的方式,會導致任何客戶端都可以隨時進行解鎖,即使這把鎖不是它的。
public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
jedis.del(lockKey);
}
錯誤示例2
這種解鎖代碼乍一看也是沒問題,甚至我之前也差點這樣實現,與正確姿勢差不多,唯一區別的是分成兩條命令去執行,代碼如下:
public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {
// 判斷加鎖與解鎖是不是同一個客戶端
if (requestId.equals(jedis.get(lockKey))) {
// 若在此時,這把鎖突然不是這個客戶端的,則會誤解鎖
jedis.del(lockKey);
}
}
再造輪子:基於Lua腳本實現分布式鎖
lua腳本的好處
前面提到,在redis中執行lua腳本,有如下的好處:
那么為什么要使用Lua語言來實現呢?
因為要確保上述操作是原子性的。那么為什么執行eval()方法可以確保原子性,源於Redis的特性.
簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,並且直到eval命令執行完成,Redis才會執行其他命
所以:
大部分的開源框架(如 redission)中的分布式鎖組件,都是用純lua腳本實現的。
題外話: lua腳本是高並發、高性能的必備腳本語言
有關lua的詳細介紹,請參見以下書籍:
那么,我們也來模擬一下
基於純Lua腳本的分布式鎖的執行流程
加鎖和刪除鎖的操作,使用純lua進行封裝,保障其執行時候的原子性。
基於純Lua腳本實現分布式鎖的執行流程,大致如下:

加鎖的Lua腳本: lock.lua
--- -1 failed
--- 1 success
---
local key = KEYS[1]
local requestId = KEYS[2]
local ttl = tonumber(KEYS[3])
local result = redis.call('setnx', key, requestId)
if result == 1 then
--PEXPIRE:以毫秒的形式指定過期時間
redis.call('pexpire', key, ttl)
else
result = -1;
-- 如果value相同,則認為是同一個線程的請求,則認為重入鎖
local value = redis.call('get', key)
if (value == requestId) then
result = 1;
redis.call('pexpire', key, ttl)
end
end
-- 如果獲取鎖成功,則返回 1
return result
解鎖的Lua腳本: unlock.lua:
--- -1 failed
--- 1 success
-- unlock key
local key = KEYS[1]
local requestId = KEYS[2]
local value = redis.call('get', key)
if value == requestId then
redis.call('del', key);
return 1;
end
return -1
兩個文件,放在資源文件夾下備用:

在Java中調用lua腳本,完成加鎖操作
package com.crazymaker.springcloud.standard.lock;
import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.IOUtil;
import com.crazymaker.springcloud.standard.context.SpringContextUtil;
import com.crazymaker.springcloud.standard.lua.ScriptHolder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import java.util.ArrayList;
import java.util.List;
@Slf4j
public class InnerLock {
private RedisTemplate redisTemplate;
public static final Long LOCKED = Long.valueOf(1);
public static final Long UNLOCKED = Long.valueOf(1);
public static final int EXPIRE = 2000;
String key;
String requestId; // lockValue 鎖的value ,代表線程的uuid
/**
* 默認為2000ms
*/
long expire = 2000L;
private volatile boolean isLocked = false;
private RedisScript lockScript;
private RedisScript unLockScript;
public InnerLock(String lockKey, String requestId) {
this.key = lockKey;
this.requestId = requestId;
lockScript = ScriptHolder.getLockScript();
unLockScript = ScriptHolder.getUnlockScript();
}
/**
* 搶奪鎖
*/
public void lock() {
if (null == key) {
return;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(requestId);
redisKeys.add(String.valueOf(expire));
Long res = (Long) getRedisTemplate().execute(lockScript, redisKeys);
isLocked = false;
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("搶鎖失敗").build();
}
}
/**
* 有返回值的搶奪鎖
*
* @param millisToWait
*/
public boolean lock(Long millisToWait) {
if (null == key) {
return false;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(requestId);
redisKeys.add(String.valueOf(millisToWait));
Long res = (Long) getRedisTemplate().execute(lockScript, redisKeys);
return res != null && res.equals(LOCKED);
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("搶鎖失敗").build();
}
}
//釋放鎖
public void unlock() {
if (key == null || requestId == null) {
return;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(requestId);
Long res = (Long) getRedisTemplate().execute(unLockScript, redisKeys);
// boolean unlocked = res != null && res.equals(UNLOCKED);
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("釋放鎖失敗").build();
}
}
private RedisTemplate getRedisTemplate() {
if(null==redisTemplate)
{
redisTemplate= (RedisTemplate) SpringContextUtil.getBean("stringRedisTemplate");
}
return redisTemplate;
}
}
在Java中調用lua腳本,完成加鎖操作
下一步,實現Lock接口, 完成JedisLock的分布式鎖。
其加鎖操作,通過調用 lock.lua腳本完成,代碼如下:
package com.crazymaker.springcloud.standard.lock;
import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {
private RedisTemplate redisTemplate;
RedisScript<Long> lockScript = null;
RedisScript<Long> unLockScript = null;
public static final int DEFAULT_TIMEOUT = 2000;
public static final Long LOCKED = Long.valueOf(1);
public static final Long UNLOCKED = Long.valueOf(1);
public static final Long WAIT_GAT = Long.valueOf(200);
public static final int EXPIRE = 2000;
String key;
String lockValue; // lockValue 鎖的value ,代表線程的uuid
/**
* 默認為2000ms
*/
long expire = 2000L;
public JedisLock(String lockKey, String lockValue) {
this.key = lockKey;
this.lockValue = lockValue;
}
private volatile boolean isLocked = false;
private Thread thread;
/**
* 獲取一個分布式鎖 , 超時則返回失敗
*
* @return 獲鎖成功 - true | 獲鎖失敗 - false
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//本地可重入
if (isLocked && thread == Thread.currentThread()) {
return true;
}
expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT;
long startMillis = System.currentTimeMillis();
Long millisToWait = expire;
boolean localLocked = false;
int turn = 1;
while (!localLocked) {
localLocked = this.lockInner(expire);
if (!localLocked) {
millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait > 0L) {
/**
* 還沒有超時
*/
ThreadUtil.sleepMilliSeconds(WAIT_GAT);
log.info("睡眠一下,重新開始,turn:{},剩余時間:{}", turn++, millisToWait);
} else {
log.info("搶鎖超時");
return false;
}
} else {
isLocked = true;
localLocked = true;
}
}
return isLocked;
}
/**
* 有返回值的搶奪鎖
*
* @param millisToWait
*/
public boolean lockInner(Long millisToWait) {
if (null == key) {
return false;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(lockValue);
redisKeys.add(String.valueOf(millisToWait));
Long res = (Long) redisTemplate.execute(lockScript, redisKeys);
return res != null && res.equals(LOCKED);
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("搶鎖失敗").build();
}
}
}
實現JUC的Lock顯示鎖接口,實現一個簡單的分布式鎖
其解鎖操作,通過調用unlock.lua腳本完成,代碼如下:
package com.crazymaker.springcloud.standard.lock;
import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {
private RedisTemplate redisTemplate;
RedisScript<Long> lockScript = null;
RedisScript<Long> unLockScript = null;
//釋放鎖
@Override
public void unlock() {
if (key == null || requestId == null) {
return;
}
try {
List<String> redisKeys = new ArrayList<>();
redisKeys.add(key);
redisKeys.add(requestId);
Long res = (Long) redisTemplate.execute(unLockScript, redisKeys);
} catch (Exception e) {
e.printStackTrace();
throw BusinessException.builder().errMsg("釋放鎖失敗").build();
}
}
}
編寫RedisLockService用於管理JedisLock
編寫個分布式鎖服務,用於加載lua腳本,創建 分布式鎖,代碼如下:
package com.crazymaker.springcloud.standard.lock;
import com.crazymaker.springcloud.common.util.IOUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@Slf4j
@Data
public class RedisLockService
{
private RedisTemplate redisTemplate;
static String lockLua = "script/lock.lua";
static String unLockLua = "script/unlock.lua";
static RedisScript<Long> lockScript = null;
static RedisScript<Long> unLockScript = null;
{
String script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),lockLua);
// String script = FileUtil.readString(lockLua, Charset.forName("UTF-8" ));
if(StringUtils.isEmpty(script))
{
log.error("lua load failed:"+lockLua);
}
lockScript = new DefaultRedisScript<>(script, Long.class);
// script = FileUtil.readString(unLockLua, Charset.forName("UTF-8" ));
script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),unLockLua);
if(StringUtils.isEmpty(script))
{
log.error("lua load failed:"+unLockLua);
}
unLockScript = new DefaultRedisScript<>(script, Long.class);
}
public RedisLockService(RedisTemplate redisTemplate)
{
this.redisTemplate = redisTemplate;
}
public Lock getLock(String lockKey, String lockValue) {
JedisLock lock=new JedisLock(lockKey,lockValue);
lock.setRedisTemplate(redisTemplate);
lock.setLockScript(lockScript);
lock.setUnLockScript(unLockScript);
return lock;
}
}
測試用例
接下來,終於可以上測試用例了
package com.crazymaker.springcloud.lock;
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {DemoCloudApplication.class})
// 指定啟動類
public class RedisLockTest {
@Resource
RedisLockService redisLockService;
private ExecutorService pool = Executors.newFixedThreadPool(10);
@Test
public void testLock() {
int threads = 10;
final int[] count = {0};
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
String lockValue = UUID.randomUUID().toString();
try {
Lock lock = redisLockService.getLock("test:lock:1", lockValue);
boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
if (locked) {
for (int j = 0; j < 1000; j++) {
count[0]++;
}
log.info("count = " + count[0]);
lock.unlock();
} else {
System.out.println("搶鎖失敗");
}
} catch (Exception e) {
e.printStackTrace();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("10個線程每個累加1000為: = " + count[0]);
//輸出統計結果
float time = System.currentTimeMillis() - start;
System.out.println("運行的時長為(ms):" + time);
System.out.println("每一次執行的時長為(ms):" + time / count[0]);
}
}
執行用例,結果如下:
2021-05-04 23:02:11.900 INFO 22120 --- [pool-1-thread-7] c.c.springcloud.lock.RedisLockTest LN:50 count = 6000
2021-05-04 23:02:11.901 INFO 22120 --- [pool-1-thread-1] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:11.902 INFO 22120 --- [pool-1-thread-1] c.c.springcloud.lock.RedisLockTest LN:50 count = 7000
2021-05-04 23:02:12.100 INFO 22120 --- [pool-1-thread-4] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新開始,turn:3,剩余時間:9586
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-8] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:12.101 INFO 22120 --- [pool-1-thread-4] c.c.springcloud.lock.RedisLockTest LN:50 count = 8000
2021-05-04 23:02:12.102 INFO 22120 --- [pool-1-thread-8] c.c.springcloud.lock.RedisLockTest LN:50 count = 9000
2021-05-04 23:02:12.304 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock LN:81 睡眠一下,重新開始,turn:4,剩余時間:9383
2021-05-04 23:02:12.307 INFO 22120 --- [pool-1-thread-5] c.c.springcloud.lock.RedisLockTest LN:50 count = 10000
10個線程每個累加1000為: = 10000
運行的時長為(ms):827.0
每一次執行的時長為(ms):0.0827
STW導致的鎖過期問題
下面有一個簡單的使用鎖的例子,在10秒內占着鎖:
//寫數據到文件
function writeData(filename, data) {
boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
if (!locked) {
throw 'Failed to acquire lock';
}
try {
//將數據寫到文件
var file = storage.readFile(filename);
var updated = updateContents(file, data);
storage.writeFile(filename, updated);
} finally {
lock.unlock();
}
}
問題是:如果在寫文件過程中,發生了 fullGC,並且其時間跨度較長, 超過了10秒, 那么,分布式就自動釋放了。
在此過程中,client2 搶到鎖,寫了文件。
client1 的fullGC完成后,也繼續寫文件,注意,此時client1 的並沒有占用鎖,此時寫入會導致文件數據錯亂,發生線程安全問題。
這就是STW導致的鎖過期問題。
STW導致的鎖過期問題,具體如下圖所示:

STW導致的鎖過期問題,大概的解決方案,有:
1: 模擬CAS樂觀鎖的方式,增加版本號
2:watch dog自動延期機制
1: 模擬CAS樂觀鎖的方式,增加版本號(如下圖中的token)

此方案如果要實現,需要調整業務邏輯,與之配合,所以會入侵代碼。
2:watch dog自動延期機制
客戶端1加鎖的鎖key默認生存時間才30秒,如果超過了30秒,客戶端1還想一直持有這把鎖,怎么辦呢?
簡單!只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個后台線程,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那么就會不斷的延長鎖key的生存時間。
redission,采用的就是這種方案, 此方案不會入侵業務代碼。
注意:
單機版的watch dog 並不能解決 STW的過期問題, 需要分布式版本的 watch dog, 獨立的看門狗服務。
鎖刪除之后, 取消看門狗服務的 對應的key記錄, 當然,這就使得系統變得復雜, 還要保證看門狗服務的高並發、高可用、數據一致性的問題。
為啥推薦使用Redission
作為 Java 開發人員,我們若想在程序中集成 Redis,必須使用 Redis 的第三方庫。目前大家使用的最多的第三方庫是jedis。
和SpringCloud gateway一樣,Redisson也是基於Netty實現的,是更高性能的第三方庫。 所以,這里推薦大家使用Redission替代 jedis。
在使用Redission之前,建議大家先掌握Netty的知識。
推薦大家閱讀被很多小伙伴評價為史上最為易懂的NIO、Netty書籍:《Java高並發核心編程(卷1)》
Redisson簡介
Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還實現了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分布式服務。

Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。

Redisson與Jedis的對比
1.概況對比
Jedis是Redis的java實現的客戶端,其API提供了比較全面的的Redis命令的支持,Redisson實現了分布式和可擴展的的java數據結構,和Jedis相比,功能較為簡單,不支持字符串操作,不支持排序,事物,管道,分區等Redis特性。Redisson的宗旨是促進使用者對Redis的關注分離,從而讓使用者能夠將精力更集中的放在處理業務邏輯上。
2.可伸縮性
Jedis使用阻塞的I/O,且其方法調用都是同步的,程序流程要等到sockets處理完I/O才能執行,不支持異步,Jedis客戶端實例不是線程安全的,所以需要通過連接池來使用Jedis。
Redisson使用非阻塞的I/O和基於Netty框架的事件驅動的通信層,其方法調用時異步的。Redisson的API是線程安全的,所以操作單個Redisson連接來完成各種操作。
3.第三方框架整合
Redisson在Redis的基礎上實現了java緩存標准規范;Redisson還提供了Spring Session回話管理器的實現。
Redission 的源碼地址:
github: https://github.com/redisson/redisson#quick-start

特性 & 功能:
-
支持 Redis 單節點(single)模式、哨兵(sentinel)模式、主從(Master/Slave)模式以及集群(Redis Cluster)模式
-
程序接口調用方式采用異步執行和異步流執行兩種方式
-
數據序列化,Redisson 的對象編碼類是用於將對象進行序列化和反序列化,以實現對該對象在 Redis 里的讀取和存儲
-
單個集合數據分片,在集群模式下,Redisson 為單個 Redis 集合類型提供了自動分片的功能
-
提供多種分布式對象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等
-
提供豐富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等
-
分布式鎖和同步器的實現,可重入鎖(Reentrant Lock),公平鎖(Fair Lock),聯鎖(MultiLock),紅鎖(Red Lock),信號量(Semaphonre),可過期性信號鎖(PermitExpirableSemaphore)等
-
提供先進的分布式服務,如分布式遠程服務(Remote Service),分布式實時對象(Live Object)服務,分布式執行服務(Executor Service),分布式調度任務服務(Schedule Service)和分布式映射歸納服務(MapReduce)
Redisson的使用
如何安裝 Redisson
安裝 Redisson 最便捷的方法是使用 Maven 或者 Gradle:
•Maven
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.11.4</version>
</dependency>
•Gradle
compile group: 'org.redisson', name: 'redisson', version: '3.11.4'
目前 Redisson 最新版是 3.11.4,當然你也可以通過搜索 Maven 中央倉庫 mvnrepository[1] 來找到 Redisson 的各種版本。
獲取RedissonClient對象
RedissonClient有多種模式,主要的模式有:
-
單節點模式
-
哨兵模式
-
主從模式
-
集群模式
首先介紹單節點模式。
單節點模式的程序化配置方法,大致如下:
Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);xxxxxxxxxx Config config = new Config();config.useSingleServer().setAddress("redis://myredisserver:6379");RedissonClient redisson = Redisson.create(config);// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();
SingleServerConfig singleConfig = config.useSingleServer();
SingleServerConfig類的設置參數如下:
address(節點地址)
可以通過
host:port的格式來指定節點地址。subscriptionConnectionMinimumIdleSize(發布和訂閱連接的最小空閑連接數)
默認值:
1用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。
subscriptionConnectionPoolSize(發布和訂閱連接池大小)
默認值:
50用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
connectionMinimumIdleSize(最小空閑連接數)
默認值:
32最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。
connectionPoolSize(連接池大小)
默認值:
64連接池最大容量。連接池的連接數量自動彈性伸縮。
dnsMonitoring(是否啟用DNS監測)
默認值:
false在啟用該功能以后,Redisson將會監測DNS的變化情況。
dnsMonitoringInterval(DNS監測時間間隔,單位:毫秒)
默認值:
5000監測DNS的變化情況的時間間隔。
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:
10000如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。
connectTimeout(連接超時,單位:毫秒)
默認值:
10000同節點建立連接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:
3000等待節點回復命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:
3如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:
3000當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:
3在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。
database(數據庫編號)
默認值:
0嘗試連接的數據庫編號。
password(密碼)
默認值:
null用於節點身份驗證的密碼。
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:
5每個連接的最大訂閱數量。
clientName(客戶端名稱)
默認值:
null在Redis節點里顯示的客戶端名稱。
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:
true開啟SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:
JDK確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。
sslTruststore(SSL信任證書庫路徑)
默認值:
null指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:
null指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null指定SSL鑰匙庫的密碼。
SpringBoot整合Redisson
Redisson有多種模式,首先介紹單機模式的整合。
一、導入Maven依賴
<!-- redisson-springboot -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.11.4</version>
</dependency>
二、核心配置文件
spring:
redis:
host: 127.0.0.1
port: 6379
database: 0
timeout: 5000
三、添加配置類
RedissonConfig.java
import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RedissonConfig {
@Autowired
private RedisProperties redisProperties;
@Bean
public RedissonClient redissonClient() {
Config config = new Config();
String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");
config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
config.useSingleServer().setDatabase(3);
return Redisson.create(config);
}
}
自定義starter
由於redission可以有多種模式,處於學習的目的,將多種模式封裝成一個start,可以學習一下starter的制作。

封裝一個RedissonManager,通過策略模式,根據不同的配置類型,創建 RedissionConfig實例,然后創建RedissionClient對象。

使用RBucket操作分布式對象
Redission模擬了Java的面向對象編程思想,可以簡單理解為一切皆為對象。
每一個 Redisson 對象 實現了RObject and RExpirable 兩個interfaces.
Usage example:
RObject object = redisson.get...()
object.sizeInMemory();
object.delete();
object.rename("newname");
object.isExists();
// catch expired event
object.addListener(new ExpiredObjectListener() {
...
});
// catch delete event
object.addListener(new DeletedObjectListener() {
...
});
每一個Redisson 對象的名字,就是 Redis中的 Key.
RMap map = redisson.getMap("mymap");
map.getName(); // = mymap
可以通過 RKeys 接口操作Redis中的keys.
Usage example:
RKeys keys = redisson.getKeys();
Iterable<String> allKeys = keys.getKeys();
Iterable<String> foundedKeys = keys.getKeysByPattern('key*');
long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");
long deletedKeysAmount = keys.deleteByPattern("test?");
String randomKey = keys.randomKey();
long keysAmount = keys.count();
keys.flushall();
keys.flushdb();
Redisson通過RBucket接口代表可以訪問任何類型的基礎對象,或者普通對象。
RBucket有一系列的工具方法,如compareAndSet(),get(),getAndDelete(),getAndSet(),set(),size(),trySet()等等,用於設值/取值/獲取尺寸。
RBucket普通對象的最大大小,為512兆字節。
RBucket<AnyObject> bucket = redisson.getBucket("anyObject");
bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();
bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));
下面是一個完整的實例:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testRBucketExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient client = redissonManager.getRedisson();
// RList 繼承了 java.util.List 接口
RBucket<String> rstring = client.getBucket("redission:test:bucket:string");
rstring.set("this is a string");
RBucket<UserDTO> ruser = client.getBucket("redission:test:bucket:user");
UserDTO dto = new UserDTO();
dto.setToken(UUID.randomUUID().toString());
ruser.set(dto);
System.out.println("string is: " + rstring.get());
System.out.println("dto is: " + ruser.get());
client.shutdown();
}
}
運行上面的代碼時,可以獲得以下輸出:
string is: this is a string
dto is: UserDTO(id=null, userId=null, username=null, password=null, nickname=null, token=183b6eeb-65a8-4b2a-80c6-cf17c08332ce, createTime=null, updateTime=null, headImgUrl=null, mobile=null, sex=null, enabled=null, type=null, openId=null, isDel=false)

使用 RList 操作 Redis 列表
下面的代碼簡單演示了如何在 Redisson 中使用 RList 對象。RList 是 Java 的 List 集合的分布式並發實現。
考慮以下代碼:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testListExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient client = redissonManager.getRedisson();
// RList 繼承了 java.util.List 接口
RList<String> nameList = client.getList("redission:test:nameList");
nameList.clear();
nameList.add("張三");
nameList.add("李四");
nameList.add("王五");
nameList.remove(-1);
System.out.println("List size: " + nameList.size());
boolean contains = nameList.contains("李四");
System.out.println("Is list contains name '李四': " + contains);
nameList.forEach(System.out::println);
client.shutdown();
}
}
運行上面的代碼時,可以獲得以下輸出:
List size: 2
Is list contains name '李四': true
張三
李四

使用 RMap 操作 Redis 哈希
Redisson 還包括 RMap,它是 Java Map 集合的分布式並發實現,考慮以下代碼:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testListExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient client = redissonManager.getRedisson();
// RMap 繼承了 java.util.concurrent.ConcurrentMap 接口
RMap<String, Object> map = client.getMap("redission:test:personalMap");
map.put("name", "張三");
map.put("address", "北京");
map.put("age", new Integer(50));
System.out.println("Map size: " + map.size());
boolean contains = map.containsKey("age");
System.out.println("Is map contains key 'age': " + contains);
String value = String.valueOf(map.get("name"));
System.out.println("Value mapped by key 'name': " + value);
client.shutdown();
}
}
運行上面的代碼時,將會看到以下輸出:
Map size: 3
Is map contains key 'age': true
Value mapped by key 'name': 張三

執行 Lua腳本
Lua是一種開源、簡單易學、輕量小巧的腳本語言,用標准C語言編寫。
其設計的目的就是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。
Redis從2.6版本開始支持Lua腳本,Redis使用Lua可以:
- 原子操作。Redis會將整個腳本作為一個整體執行,不會被中斷。可以用來批量更新、批量插入
- 減少網絡開銷。多個Redis操作合並為一個腳本,減少網絡時延
- 代碼復用。客戶端發送的腳本可以存儲在Redis中,其他客戶端可以根據腳本的id調用。
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testLuaExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
redisson.getBucket("redission:test:foo").set("bar");
String r = redisson.getScript().eval(RScript.Mode.READ_ONLY,
"return redis.call('get', 'redission:test:foo')", RScript.ReturnType.VALUE);
System.out.println("foo: " + r);
// 通過預存的腳本進行同樣的操作
RScript s = redisson.getScript();
// 首先將腳本加載到Redis
String sha1 = s.scriptLoad("return redis.call('get', 'redission:test:foo')");
// 返回值 res == 282297a0228f48cd3fc6a55de6316f31422f5d17
System.out.println("sha1: " + sha1);
// 再通過SHA值調用腳本
Future<Object> r1 = redisson.getScript().evalShaAsync(RScript.Mode.READ_ONLY,
sha1,
RScript.ReturnType.VALUE,
Collections.emptyList());
try {
System.out.println("res: " + r1.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
client.shutdown();
}
}
運行上面的代碼時,將會看到以下輸出:
foo: bar
sha1: 282297a0228f48cd3fc6a55de6316f31422f5d17
res: bar

使用 RLock 實現 Redis 分布式鎖
RLock 是 Java 中可重入鎖的分布式實現,下面的代碼演示了 RLock 的用法:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testLockExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
// RLock 繼承了 java.util.concurrent.locks.Lock 接口
RLock lock = redisson.getLock("redission:test:lock:1");
final int[] count = {0};
int threads = 10;
ExecutorService pool = Executors.newFixedThreadPool(10);
CountDownLatch countDownLatch = new CountDownLatch(threads);
long start = System.currentTimeMillis();
for (int i = 0; i < threads; i++) {
pool.submit(() ->
{
for (int j = 0; j < 1000; j++) {
lock.lock();
count[0]++;
lock.unlock();
}
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("10個線程每個累加1000為: = " + count[0]);
//輸出統計結果
float time = System.currentTimeMillis() - start;
System.out.println("運行的時長為:" + time);
System.out.println("每一次執行的時長為:" + time/count[0]);
}
}
此代碼將產生以下輸出:
10個線程每個累加1000為: = 10000
運行的時長為:14172.0
每一次執行的時長為:1.4172
使用 RAtomicLong 實現 Redis 原子操作
RAtomicLong 是 Java 中 AtomicLong 類的分布式“替代品”,用於在並發環境中保存長值。以下示例代碼演示了 RAtomicLong 的用法:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testRAtomicLongExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
// 線程數
final int threads = 10;
// 每條線程的執行輪數
final int turns = 1000;
ExecutorService pool = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++)
{
pool.submit(() ->
{
try
{
for (int j = 0; j < turns; j++)
{
atomicLong.incrementAndGet();
}
} catch (Exception e)
{
e.printStackTrace();
}
});
}
ThreadUtil.sleepSeconds(5);
System.out.println("atomicLong: " + atomicLong.get());
redisson.shutdown();
}
}
此代碼的輸出將是:
atomicLong: 10000

整長型累加器(LongAdder)
基於Redis的Redisson分布式整長型累加器(LongAdder)采用了與java.util.concurrent.atomic.LongAdder類似的接口。通過利用客戶端內置的LongAdder對象,為分布式環境下遞增和遞減操作提供了很高得性能。據統計其性能最高比分布式AtomicLong對象快 12000 倍。
完美適用於分布式統計計量場景。下面是RLongAdder的使用案例:
RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();
以下示例代碼演示了 RLongAdder 的用法:
public class RedissionTest {
@Resource
RedissonManager redissonManager;
@Test
public void testRAtomicLongExamples() {
// 默認連接上 127.0.0.1:6379
RedissonClient redisson = redissonManager.getRedisson();
RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
// 線程數
final int threads = 10;
// 每條線程的執行輪數
final int turns = 1000;
ExecutorService pool = Executors.newFixedThreadPool(threads);
for (int i = 0; i < threads; i++)
{
pool.submit(() ->
{
try
{
for (int j = 0; j < turns; j++)
{
atomicLong.incrementAndGet();
}
} catch (Exception e)
{
e.printStackTrace();
}
});
}
ThreadUtil.sleepSeconds(5);
System.out.println("atomicLong: " + atomicLong.get());
redisson.shutdown();
}
}
此代碼將產生以下輸出:
longAdder: 10000
運行的時長為:5085.0
每一次執行的時長為:0.5085
當不再使用整長型累加器對象的時候應該自行手動銷毀,如果Redisson對象被關閉(shutdown)了,則不用手動銷毀。
RLongAdder atomicLong = ...
atomicLong.destroy();
序列化
Redisson的對象編碼類是用於將對象進行序列化和反序列化,以實現對該對象在Redis里的讀取和存儲。Redisson提供了以下幾種的對象編碼應用,以供大家選擇:
| 編碼類名稱 | 說明 |
|---|---|
org.redisson.codec.JsonJacksonCodec |
Jackson JSON 編碼 默認編碼 |
org.redisson.codec.AvroJacksonCodec |
Avro 一個二進制的JSON編碼 |
org.redisson.codec.SmileJacksonCodec |
Smile 另一個二進制的JSON編碼 |
org.redisson.codec.CborJacksonCodec |
CBOR 又一個二進制的JSON編碼 |
org.redisson.codec.MsgPackJacksonCodec |
MsgPack 再來一個二進制的JSON編碼 |
org.redisson.codec.IonJacksonCodec |
Amazon Ion 亞馬遜的Ion編碼,格式與JSON類似 |
org.redisson.codec.KryoCodec |
Kryo 二進制對象序列化編碼 |
org.redisson.codec.SerializationCodec |
JDK序列化編碼 |
org.redisson.codec.FstCodec |
FST 10倍於JDK序列化性能而且100%兼容的編碼 |
org.redisson.codec.LZ4Codec |
LZ4 壓縮型序列化對象編碼 |
org.redisson.codec.SnappyCodec |
Snappy 另一個壓縮型序列化對象編碼 |
org.redisson.client.codec.JsonJacksonMapCodec |
基於Jackson的映射類使用的編碼。可用於避免序列化類的信息,以及用於解決使用byte[]遇到的問題。 |
org.redisson.client.codec.StringCodec |
純字符串編碼(無轉換) |
org.redisson.client.codec.LongCodec |
純整長型數字編碼(無轉換) |
org.redisson.client.codec.ByteArrayCodec |
字節數組編碼 |
org.redisson.codec.CompositeCodec |
用來組合多種不同編碼在一起 |
由Redisson默認的編碼器為二進制編碼器,為了序列化后的內容可見,需要使用Json文本序列化編碼工具類。Redisson提供了編碼器 JsonJacksonCodec,作為Json文本序列化編碼工具類。
問題是:JsonJackson在序列化有雙向引用的對象時,會出現無限循環異常。而fastjson在檢查出雙向引用后會自動用引用符$ref替換,終止循環。
所以,一些特殊場景中:用fastjson能 正常序列化到redis,而JsonJackson則拋出無限循環異常。
為了序列化后的內容可見,所以不用redission其他自帶的,自行實現fastjson編碼器:
package com.crayon.distributedredissionspringbootstarter.codec;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;
import java.io.IOException;
public class FastjsonCodec extends BaseCodec {
private final Encoder encoder = in -> {
ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
try {
ByteBufOutputStream os = new ByteBufOutputStream(out);
JSON.writeJSONString(os, in, SerializerFeature.WriteClassName);
return os.buffer();
} catch (IOException e) {
out.release();
throw e;
} catch (Exception e) {
out.release();
throw new IOException(e);
}
};
private final Decoder<Object> decoder = (buf, state) ->
JSON.parseObject(new ByteBufInputStream(buf), Object.class);
@Override
public Decoder<Object> getValueDecoder() {
return decoder;
}
@Override
public Encoder getValueEncoder() {
return encoder;
}
}
替換的方法如下:
*/
@Slf4j
public class StandaloneConfigImpl implements RedissonConfigService {
@Override
public Config createRedissonConfig(RedissonConfig redissonConfig) {
Config config = new Config();
try {
String address = redissonConfig.getAddress();
String password = redissonConfig.getPassword();
int database = redissonConfig.getDatabase();
String redisAddr = GlobalConstant.REDIS_CONNECTION_PREFIX.getConstant_value() + address;
config.useSingleServer().setAddress(redisAddr);
config.useSingleServer().setDatabase(database);
//密碼可以為空
if (!StringUtils.isEmpty(password)) {
config.useSingleServer().setPassword(password);
}
log.info("初始化[單機部署]方式Config,redisAddress:" + address);
// config.setCodec( new FstCodec());
config.setCodec( new FastjsonCodec());
} catch (Exception e) {
log.error("單機部署 Redisson init error", e);
}
return config;
}
}
哨兵模式
哨兵模式即sentinel模式,配置Redis哨兵服務的官方文檔在這里。
哨兵模式實現代碼和單機模式幾乎一樣,唯一的不同就是Config的構造.
程序化配置哨兵模式的方法如下:
Config config = new Config();
config.useSentinelServers()
.setMasterName("mymaster")
// use "rediss://" for SSL connection
.addSentinelAddress("redis://127.0.0.1:26389", "redis://127.0.0.1:26379")
.addSentinelAddress("redis://127.0.0.1:26319");
RedissonClient redisson = Redisson.create(config);
Redisson的哨兵模式的使用方法如下:
SentinelServersConfig sentinelConfig = config.useSentinelServers();
SentinelServersConfig配置參數如下:
配置Redis哨兵服務的官方文檔在這里。Redisson的哨兵模式的使用方法如下:
SentinelServersConfig sentinelConfig = config.useSentinelServers();
SentinelServersConfig類的設置參數如下:dnsMonitoringInterval(DNS監控間隔,單位:毫秒)
默認值:
5000用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM里的DNS數據的緩存時間保持在足夠低的范圍才有意義。用
-1來禁用該功能。masterName(主服務器的名稱)
主服務器的名稱是哨兵進程中用來監測主從服務切換情況的。
addSentinelAddress(添加哨兵節點地址)
可以通過
host:port的格式來指定哨兵節點的地址。多個節點可以一次性批量添加。readMode(讀取操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里讀取)注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。
設置讀取操作選擇節點的模式。可用值為:
SLAVE- 只在從服務節點里讀取。MASTER- 只在主服務節點里讀取。MASTER_SLAVE- 在主從服務節點里都可以讀取。subscriptionMode(訂閱操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里訂閱)設置訂閱操作選擇節點的模式。可用值為:
SLAVE- 只在從服務節點里訂閱。MASTER- 只在主服務節點里訂閱。loadBalancer(負載均衡算法類的選擇)
默認值:
org.redisson.connection.balancer.RoundRobinLoadBalancer在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:
org.redisson.connection.balancer.WeightedRoundRobinBalancer- 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer- 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer- 隨機調度算法subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:
1多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。
subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:
50多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:
32多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。
slaveConnectionPoolSize(從節點連接池大小)
默認值:
64多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:
32多從節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。
masterConnectionPoolSize(主節點連接池大小)
默認值:
64主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:
10000如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。
connectTimeout(連接超時,單位:毫秒)
默認值:
10000同任何節點建立連接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:
3000等待節點回復命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:
3如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:
3000當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:
3在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。
database(數據庫編號)
默認值:
0嘗試連接的數據庫編號。
password(密碼)
默認值:
null用於節點身份驗證的密碼。
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:
5每個連接的最大訂閱數量。
clientName(客戶端名稱)
默認值:
null在Redis節點里顯示的客戶端名稱。
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:
true開啟SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:
JDK確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。
sslTruststore(SSL信任證書庫路徑)
默認值:
null指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:
null指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null指定SSL鑰匙庫的密碼。
通過屬性文件,配置的示例如下:
---
sentinelServersConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
failedSlaveReconnectionInterval: 3000
failedSlaveCheckInterval: 60000
password: null
subscriptionsPerConnection: 5
clientName: null
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
slaveConnectionMinimumIdleSize: 24
slaveConnectionPoolSize: 64
masterConnectionMinimumIdleSize: 24
masterConnectionPoolSize: 64
readMode: "SLAVE"
subscriptionMode: "SLAVE"
sentinelAddresses:
- "redis://127.0.0.1:26379"
- "redis://127.0.0.1:26389"
masterName: "mymaster"
database: 0
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.MarshallingCodec> {}
transportMode: "NIO"
主從模式
介紹配置Redis主從服務組態的文檔在這里.
程序化配置主從模式的方法如下:
Config config = new Config();
config.useMasterSlaveServers()
// use "rediss://" for SSL connection
.setMasterAddress("redis://127.0.0.1:6379")
.addSlaveAddress("redis://127.0.0.1:6389", "redis://127.0.0.1:6332", "redis://127.0.0.1:6419")
.addSlaveAddress("redis://127.0.0.1:6399");
RedissonClient redisson = Redisson.create(config);
主從模式使用到MasterSlaveServersConfig :
MasterSlaveServersConfig masterSlaveConfig = config.useMasterSlaveServers();
MasterSlaveServersConfig 類的設置參數如下:
dnsMonitoringInterval(DNS監控間隔,單位:毫秒)
默認值:
5000用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM里的DNS數據的緩存時間保持在足夠低的范圍才有意義。用
-1來禁用該功能。masterAddress(主節點地址)
可以通過
host:port的格式來指定主節點地址。addSlaveAddress(添加從主節點地址)
可以通過
host:port的格式來指定從節點的地址。多個節點可以一次性批量添加。readMode(讀取操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里讀取)注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。
設置讀取操作選擇節點的模式。可用值為:
SLAVE- 只在從服務節點里讀取。MASTER- 只在主服務節點里讀取。MASTER_SLAVE- 在主從服務節點里都可以讀取。subscriptionMode(訂閱操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里訂閱)設置訂閱操作選擇節點的模式。可用值為:
SLAVE- 只在從服務節點里訂閱。MASTER- 只在主服務節點里訂閱。loadBalancer(負載均衡算法類的選擇)
默認值:
org.redisson.connection.balancer.RoundRobinLoadBalancer在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:
org.redisson.connection.balancer.WeightedRoundRobinBalancer- 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer- 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer- 隨機調度算法subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:
1多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。
subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:
50多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:
32多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。
slaveConnectionPoolSize(從節點連接池大小)
默認值:
64多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:
32多從節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。
masterConnectionPoolSize(主節點連接池大小)
默認值:
64主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:
10000如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。
connectTimeout(連接超時,單位:毫秒)
默認值:
10000同任何節點建立連接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:
3000等待節點回復命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:
3如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:
3000當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:
3在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。
database(數據庫編號)
默認值:
0嘗試連接的數據庫編號。
password(密碼)
默認值:
null用於節點身份驗證的密碼。
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:
5每個連接的最大訂閱數量。
clientName(客戶端名稱)
默認值:
null在Redis節點里顯示的客戶端名稱。
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:
true開啟SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:
JDK確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。
sslTruststore(SSL信任證書庫路徑)
默認值:
null指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:
null指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null指定SSL鑰匙庫的密碼。
集群模式
集群模式除了適用於Redis集群環境,也適用於任何雲計算服務商提供的集群模式,例如AWS ElastiCache集群版、Azure Redis Cache和阿里雲(Aliyun)的雲數據庫Redis版。
介紹配置Redis集群組態的文檔在這里。 Redis集群組態的最低要求是必須有三個主節點。
集群模式構造Config如下:
Config config = new Config();
config.useClusterServers()
.setScanInterval(2000) // 集群狀態掃描間隔時間,單位是毫秒
//可以用"rediss://"來啟用SSL連接
.addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
.addNodeAddress("redis://127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);
集群模式使用到ClusterServersConfig :
ClusterServersConfig clusterConfig = config.useClusterServers();
ClusterServersConfig 配置參數如下:
nodeAddresses(添加節點地址)
可以通過
host:port的格式來添加Redis集群節點的地址。多個節點可以一次性批量添加。scanInterval(集群掃描間隔時間)
默認值:
1000對Redis集群節點狀態掃描的時間間隔。單位是毫秒。
slots(分片數量)
默認值:
231用於指定數據分片過程中的分片數量。支持數據分片/框架結構有:集(Set)、映射(Map)、BitSet、Bloom filter, Spring Cache和Hibernate Cache等.readMode(讀取操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里讀取)注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。
設置讀取操作選擇節點的模式。可用值為:
SLAVE- 只在從服務節點里讀取。MASTER- 只在主服務節點里讀取。MASTER_SLAVE- 在主從服務節點里都可以讀取。subscriptionMode(訂閱操作的負載均衡模式)
默認值:
SLAVE(只在從服務節點里訂閱)設置訂閱操作選擇節點的模式。可用值為:
SLAVE- 只在從服務節點里訂閱。MASTER- 只在主服務節點里訂閱。loadBalancer(負載均衡算法類的選擇)
默認值:
org.redisson.connection.balancer.RoundRobinLoadBalancer在多Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:
org.redisson.connection.balancer.WeightedRoundRobinBalancer- 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer- 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer- 隨機調度算法subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)
默認值:
1多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。
subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)
默認值:
50多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
slaveConnectionMinimumIdleSize(從節點最小空閑連接數)
默認值:
32多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。
slaveConnectionPoolSize(從節點連接池大小)
默認值:
64多從節點的環境里,每個 從服務節點里用於普通操作(非 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。
masterConnectionMinimumIdleSize(主節點最小空閑連接數)
默認值:
32多節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。
masterConnectionPoolSize(主節點連接池大小)
默認值:
64多主節點的環境里,每個 主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。
idleConnectionTimeout(連接空閑超時,單位:毫秒)
默認值:
10000如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。
connectTimeout(連接超時,單位:毫秒)
默認值:
10000同任何節點建立連接時的等待超時。時間單位是毫秒。
timeout(命令等待超時,單位:毫秒)
默認值:
3000等待節點回復命令的時間。該時間從命令發送成功時開始計時。
retryAttempts(命令失敗重試次數)
默認值:
3如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。
retryInterval(命令重試發送時間間隔,單位:毫秒)
默認值:
1500在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。
reconnectionTimeout(重新連接時間間隔,單位:毫秒)
默認值:
3000當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。
failedAttempts(執行失敗最大次數)
默認值:
3在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。
password(密碼)
默認值:
null用於節點身份驗證的密碼。
subscriptionsPerConnection(單個連接最大訂閱數量)
默認值:
5每個連接的最大訂閱數量。
clientName(客戶端名稱)
默認值:
null在Redis節點里顯示的客戶端名稱。
sslEnableEndpointIdentification(啟用SSL終端識別)
默認值:
true開啟SSL終端識別能力。
sslProvider(SSL實現方式)
默認值:
JDK確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。
sslTruststore(SSL信任證書庫路徑)
默認值:
null指定SSL信任證書庫的路徑。
sslTruststorePassword(SSL信任證書庫密碼)
默認值:
null指定SSL信任證書庫的密碼。
sslKeystore(SSL鑰匙庫路徑)
默認值:
null指定SSL鑰匙庫的路徑。
sslKeystorePassword(SSL鑰匙庫密碼)
默認值:
null指定SSL鑰匙庫的密碼。
簡單Redision鎖的原理
Redis發展到現在,幾種常見的部署架構有:
- 單機模式;
- 哨兵模式;
- 集群模式;
先介紹,基於單機模式的簡單Redision鎖的使用。
簡單Redision鎖的使用
單機模式下,簡單Redision鎖的使用如下:
// 構造redisson實現分布式鎖必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://172.29.1.180:5379").setPassword("a123456").setDatabase(0);
// 構造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 設置鎖定資源名稱
RLock disLock = redissonClient.getLock("DISLOCK");
//嘗試獲取分布式鎖
boolean isLock= disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
if (isLock) {
try {
//TODO if get lock success, do something;
Thread.sleep(15000);
} catch (Exception e) {
} finally {
// 無論如何, 最后都要解鎖
disLock.unlock();
}
}
通過代碼可知,經過Redisson的封裝,實現Redis分布式鎖非常方便,和顯式鎖的使用方法是一樣的。RLock接口繼承了 Lock接口。
我們再看一下Redis中的value是啥,和前文分析一樣,hash結構, redis 的key就是資源名稱。
hash結構的key就是UUID+threadId,hash結構的value就是重入值,在分布式鎖時,這個值為1(Redisson還可以實現重入鎖,那么這個值就取決於重入次數了):
172.29.1.180:5379> hgetall DISLOCK
1) "01a6d806-d282-4715-9bec-f51b9aa98110:1"
2) "1"
使用客戶端工具看到的效果如下:

getLock()方法
可以看到,調用getLock()方法后實際返回一個RedissonLock對象
tryLock方法
下面來看下tryLock方法,源碼如下:
@Override
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long time = unit.toMillis(waitTime);
long current = System.currentTimeMillis();
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
current = System.currentTimeMillis();
RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
if (!subscribeFuture.cancel(false)) {
subscribeFuture.onComplete((res, e) -> {
if (e == null) {
unsubscribe(subscribeFuture, threadId);
}
});
}
acquireFailed(threadId);
return false;
}
try {
time -= System.currentTimeMillis() - current;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
while (true) {
long currentTime = System.currentTimeMillis();
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return true;
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
// waiting for message
currentTime = System.currentTimeMillis();
if (ttl >= 0 && ttl < time) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
}
time -= System.currentTimeMillis() - currentTime;
if (time <= 0) {
acquireFailed(threadId);
return false;
}
}
} finally {
unsubscribe(subscribeFuture, threadId);
}
// return get(tryLockAsync(waitTime, leaseTime, unit));
}
以上代碼使用了異步回調模式,RFuture 繼承了 java.util.concurrent.Future
tryAcquire()方法
在RedissonLock對象的lock()方法主要調用tryAcquire()方法

tryLockInnerAsync
由於leaseTime == -1,於是走tryLockInnerAsync()方法,這個方法才是關鍵
首先,看一下evalWriteAsync方法的定義
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
這和前面的jedis調用lua腳本類似,最后兩個參數分別是keys和params。
單獨將調用的那一段摘出來看,實際調用是這樣的:
commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
"if (redis.call('exists', KEYS[1]) == 0) then " +
"redis.call('hset', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return nil; " +
"end; " +
"return redis.call('pttl', KEYS[1]);",
Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
結合上面的參數聲明,我們可以知道,這里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)
假設:
- 前面獲取鎖時傳的name是“DISLOCK”,
- 假設調用的線程ID是1,
- 假設成員變量UUID類型的id是01a6d806-d282-4715-9bec-f51b9aa98110
那么KEYS[1]=DISLOCK,ARGV[2]=01a6d806-d282-4715-9bec-f51b9aa98110:1
因此,這段腳本的意思是
1、判斷有沒有一個叫“DISLOCK”的key
2、如果沒有,則在其下設置一個字段為“01a6d806-d282-4715-9bec-f51b9aa98110:1”,值為“1”的鍵值對 ,並設置它的過期時間
3、如果存在,則進一步判斷“01a6d806-d282-4715-9bec-f51b9aa98110:1”是否存在,若存在,則其值加1,並重新設置過期時間
4、返回“DISLOCK”的生存時間(毫秒)
原理:加鎖機制
這里用的數據結構是hash,hash的結構是: key 字段1 值1 字段2 值2 。。。
用在鎖這個場景下,key就表示鎖的名稱,也可以理解為臨界資源,字段就表示當前獲得鎖的線程
所有競爭這把鎖的線程都要判斷在這個key下有沒有自己線程的字段,如果沒有則不能獲得鎖,如果有,則相當於重入,字段值加1(次數)

Lua腳本的詳解
為何要使用lua語言?
因為一大堆復雜的業務邏輯,可以通過封裝在lua腳本中發送給redis,保證這段復雜業務邏輯執行的原子性

回顧一下evalWriteAsync方法的定義
<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);
注意,其最后兩個參數分別是keys和params。
關於 lua腳本的參數解釋:
KEYS[1]代表的是你加鎖的那個key,比如說:
RLock lock = redisson.getLock("DISLOCK");
這里你自己設置了加鎖的那個鎖key就是“DISLOCK”。
ARGV[1]代表的就是鎖key的默認生存時間
調用的時候,傳遞的參數為 internalLockLeaseTime ,該值默認30秒。
ARGV[2]代表的是加鎖的客戶端的ID,類似於下面這樣:
01a6d806-d282-4715-9bec-f51b9aa98110:1
lua腳本的第一段if判斷語句,就是用“exists DISLOCK”命令判斷一下,如果你要加鎖的那個鎖key不存在的話,你就進行加鎖。
如何加鎖呢?很簡單,用下面的redis命令:
hset DISLOCK 01a6d806-d282-4715-9bec-f51b9aa98110:1 1
通過這個命令設置一個hash數據結構,這行命令執行后,會出現一個類似下面的數據結構:
DISLOCK:
{
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
}
接着會執行“pexpire DISLOCK 30000”命令,設置DISLOCK這個鎖key的生存時間是30秒(默認)
鎖互斥機制
那么在這個時候,如果客戶端2來嘗試加鎖,執行了同樣的一段lua腳本,會咋樣呢?
很簡單,第一個if判斷會執行“exists DISLOCK”,發現DISLOCK 這個鎖key已經存在了。
接着第二個if判斷,判斷一下,DISLOCK鎖key的hash數據結構中,是否包含客戶端2的ID,但是明顯不是的,因為那里包含的是客戶端1的ID。
所以,客戶端2會獲取到pttl DISLOCK返回的一個數字,這個數字代表了DISLOCK 這個鎖key的剩余生存時間。比如還剩15000毫秒的生存時間。
此時客戶端2會進入一個while循環,不停的嘗試加鎖。
可重入加鎖機制
如果客戶端1都已經持有了這把鎖了,結果可重入的加鎖會怎么樣呢?
RLock lock = redisson.getLock("DISLOCK")
lock.lock();
//業務代碼
lock.lock();
//業務代碼
lock.unlock();
lock.unlock();
分析上面那段lua腳本。
第一個if判斷肯定不成立,“exists DISLOCK”會顯示鎖key已經存在了。
第二個if判斷會成立,因為DISLOCK的hash數據結構中包含的那個ID,就是客戶端1的那個ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”
此時就會執行可重入加鎖的邏輯,他會用:
incrby DISLOCK
8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
通過這個命令,對客戶端1的加鎖次數,累加1。
此時DISLOCK數據結構變為下面這樣:
DISLOCK:
{
8743c9c0-0795-4907-87fd-6c719a6b4586:1 2
}
釋放鎖機制
如果執行lock.unlock(),就可以釋放分布式鎖,此時的業務邏輯也是非常簡單的。
其實說白了,就是每次都對DISLOCK數據結構中的那個加鎖次數減1。
如果發現加鎖次數是0了,說明這個客戶端已經不再持有鎖了,此時就會用:
“del DISLOCK”命令,從redis里刪除這個key。
然后呢,另外的客戶端2就可以嘗試完成加鎖了。
unlock 源碼
@Override
public void unlock() {
try {
get(unlockAsync(Thread.currentThread().getId()));
} catch (RedisException e) {
if (e.getCause() instanceof IllegalMonitorStateException) {
throw (IllegalMonitorStateException) e.getCause();
} else {
throw e;
}
}
// Future<Void> future = unlockAsync();
// future.awaitUninterruptibly();
// if (future.isSuccess()) {
// return;
// }
// if (future.cause() instanceof IllegalMonitorStateException) {
// throw (IllegalMonitorStateException)future.cause();
// }
// throw commandExecutor.convertException(future);
}
再深入一下,實際調用的是unlockInnerAsync方法
unlockInnerAsync方法

原理:Redision 解鎖機制
上圖沒有截取完整,完整的源碼如下:
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
"return nil;" +
"end; " +
"local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
"if (counter > 0) then " +
"redis.call('pexpire', KEYS[1], ARGV[2]); " +
"return 0; " +
"else " +
"redis.call('del', KEYS[1]); " +
"redis.call('publish', KEYS[2], ARGV[1]); " +
"return 1; "+
"end; " +
"return nil;",
Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}
我們還是假設name=DISLOCK,假設線程ID是1
同理,我們可以知道
KEYS[1]是getName(),即KEYS[1]=DISLOCK
KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{DISLOCK}
ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0
ARGV[2]是生存時間
ARGV[3]是getLockName(threadId),即ARGV[3]=8743c9c0-0795-4907-87fd-6c719a6b4586:1
因此,上面腳本的意思是:
1、判斷是否存在一個叫“DISLOCK”的key
2、如果不存在,返回nil
3、如果存在,使用Redis Hincrby 命令用於為哈希表中的字段值加上指定增量值 -1 ,代表減去1
4、若counter >,返回空,若字段存在,則字段值減1
5、若減完以后,counter > 0 值仍大於0,則返回0
6、減完后,若字段值小於或等於0,則用 publish 命令廣播一條消息,廣播內容是0,並返回1;
可以猜測,廣播0表示資源可用,即通知那些等待獲取鎖的線程現在可以獲得鎖了

通過redis Channel 解鎖訂閱
以上是正常情況下獲取到鎖的情況,那么當無法立即獲取到鎖的時候怎么辦呢?
再回到前面獲取鎖的位置
@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
long threadId = Thread.currentThread().getId();
Long ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
return;
}
// 訂閱
RFuture<RedissonLockEntry> future = subscribe(threadId);
commandExecutor.syncSubscription(future);
try {
while (true) {
ttl = tryAcquire(leaseTime, unit, threadId);
// lock acquired
if (ttl == null) {
break;
}
// waiting for message
if (ttl >= 0) {
getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
} else {
getEntry(threadId).getLatch().acquire();
}
}
} finally {
unsubscribe(future, threadId);
}
// get(lockAsync(leaseTime, unit));
}
protected static final LockPubSub PUBSUB = new LockPubSub();
protected RFuture<RedissonLockEntry> subscribe(long threadId) {
return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}
這里會訂閱Channel,當資源可用時可以及時知道,並搶占,防止無效的輪詢而浪費資源
這里的channel為:
redisson_lock__channel:


當資源可用用的時候,循環去嘗試獲取鎖,由於多個線程同時去競爭資源,所以這里用了信號量,對於同一個資源只允許一個線程獲得鎖,其它的線程阻塞
這點,有點兒類似 Zookeeper分布式鎖:
有關zookeeper分布式鎖的原理和實現,具體請參見下面的博客:
Zookeeper 分布式鎖 (圖解+秒懂+史上最全)
watch dog自動延期機制
客戶端1加鎖的鎖key默認生存時間才30秒,如果超過了30秒,客戶端1還想一直持有這把鎖,怎么辦呢?
簡單!只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個后台線程,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那么就會不斷的延長鎖key的生存時間。
使用watchDog機制實現鎖的續期
但是聰明的同學肯定會問:
有效時間設置多長,假如我的業務操作比有效時間長,我的業務代碼還沒執行完,就自動給我解鎖了,不就完蛋了嗎。
這個問題就有點棘手了,在網上也有很多討論:
第一種解決方法就是靠程序員自己去把握,預估一下業務代碼需要執行的時間,然后設置有效期時間比執行時間長一些,保證不會因為自動解鎖影響到客戶端業務代碼的執行。
但是這並不是萬全之策,比如網絡抖動這種情況是無法預測的,也有可能導致業務代碼執行的時間變長,所以並不安全。
第二種方法,使用監事狗watchDog機制實現鎖的續期。
第二種方法比較靠譜一點,而且無業務入侵。
在Redisson框架實現分布式鎖的思路,就使用watchDog機制實現鎖的續期。
當加鎖成功后,同時開啟守護線程,默認有效期是30秒,每隔10秒就會給鎖續期到30秒,只要持有鎖的客戶端沒有宕機,就能保證一直持有鎖,直到業務代碼執行完畢由客戶端自己解鎖,如果宕機了自然就在有效期失效后自動解鎖。
這里,和前面解決 JVM STW的鎖過期問題有點類似,只不過,watchDog自動續期,也沒有完全解決JVM STW的鎖過期問題。
如何徹底解決 JVM STW的鎖過期問題,可以來瘋狂創客圈的社群討論。
redisson watchdog 使用和原理
實際上,redisson加鎖的基本流程圖如下:

這里專注於介紹watchdog。
首先watchdog的具體思路是 加鎖時,默認加鎖 30秒,每10秒鍾檢查一次,如果存在就重新設置 過期時間為30秒。
然后設置默認加鎖時間的參數是 lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)
官方文檔描述如下
lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)
默認值:
30000監控鎖的看門狗超時時間單位為毫秒。該參數只適用於分布式鎖的加鎖請求中未明確使用
leaseTimeout參數的情況。如果該看門狗未使用lockWatchdogTimeout去重新調整一個分布式鎖的lockWatchdogTimeout超時,那么這個鎖將變為失效狀態。這個參數可以用來避免由Redisson客戶端節點宕機或其他原因造成死鎖的情況。
需要注意的是
1.watchDog 只有在未顯示指定加鎖時間時才會生效。(這點很重要)
2.lockWatchdogTimeout設定的時間不要太小 ,比如我之前設置的是 100毫秒,由於網絡直接導致加鎖完后,watchdog去延期時,這個key在redis中已經被刪除了。
tryAcquireAsync原理
在調用lock方法時,會最終調用到tryAcquireAsync。詳細解釋如下:
private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
//如果指定了加鎖時間,會直接去加鎖
if (leaseTime != -1) {
return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
}
//沒有指定加鎖時間 會先進行加鎖,並且默認時間就是 LockWatchdogTimeout的時間
//這個是異步操作 返回RFuture 類似netty中的future
RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
//這里也是類似netty Future 的addListener,在future內容執行完成后執行
ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
if (e != null) {
return;
}
// lock acquired
if (ttlRemaining == null) {
//這里是定時執行 當前鎖自動延期的動作
scheduleExpirationRenewal(threadId);
}
});
return ttlRemainingFuture;
}
scheduleExpirationRenewal 中會調用renewExpiration。
renewExpiration執行延期動作
這里我們可以看到是 啟用了一個timeout定時,去執行延期動作
private void renewExpiration() {
Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
if (ent == null) {
return;
}
Long threadId = ent.getFirstThreadId();
if (threadId == null) {
return;
}
RFuture<Boolean> future = renewExpirationAsync(threadId);
future.onComplete((res, e) -> {
if (e != null) {
log.error("Can't update lock " + getName() + " expiration", e);
return;
}
if (res) {
//如果 沒有報錯,就再次定時延期
// reschedule itself
renewExpiration();
}
});
}
// 這里我們可以看到定時任務 是 lockWatchdogTimeout 的1/3時間去執行 renewExpirationAsync
}, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
ee.setTimeout(task);
}
最終 scheduleExpirationRenewal會調用到 renewExpirationAsync,
renewExpirationAsync
執行下面這段 lua腳本。他主要判斷就是 這個鎖是否在redis中存在,如果存在就進行 pexpire 延期。
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
"redis.call('pexpire', KEYS[1], ARGV[1]); " +
"return 1; " +
"end; " +
"return 0;",
Collections.singletonList(getName()),
internalLockLeaseTime, getLockName(threadId));
}
watchLog總結
1.要使 watchLog機制生效 ,lock時 不要設置 過期時間
2.watchlog的延時時間 可以由 lockWatchdogTimeout指定默認延時時間,但是不要設置太小。如100
3.watchdog 會每 lockWatchdogTimeout/3時間,去延時。
4.watchdog 通過 類似netty的 Future功能來實現異步延時
5.watchdog 最終還是通過 lua腳本來進行延時
Redisson框架的分布式鎖
Redisson框架十分強大,除了前面介紹的 getLock方法獲取的分布式鎖(輸入可重入鎖的類型),還有很多其他的分布式鎖類型。
總體的Redisson框架的分布式鎖類型,大致如下:
- 可重入鎖
- 公平鎖
- 聯鎖
- 紅鎖
- 讀寫鎖
- 信號量
- 可過期信號量
- 閉鎖(/倒數閂)
1.可重入鎖(Reentrant Lock)
Redisson的分布式可重入鎖RLock Java對象實現了java.util.concurrent.locks.Lock接口,同時還支持自動過期解鎖。
public void testReentrantLock(RedissonClient redisson){
RLock lock = redisson.getLock("anyLock");
try{
// 1. 最常見的使用方法
//lock.lock();
// 2. 支持過期解鎖功能,10秒鍾以后自動解鎖, 無需調用unlock方法手動解鎖
//lock.lock(10, TimeUnit.SECONDS);
// 3. 嘗試加鎖,最多等待3秒,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS);
if(res){ //成功
// do your business
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
Redisson同時還為分布式鎖提供了異步執行的相關方法:
public void testAsyncReentrantLock(RedissonClient redisson){
RLock lock = redisson.getLock("anyLock");
try{
lock.lockAsync();
lock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = lock.tryLockAsync(3, 10, TimeUnit.SECONDS);
if(res.get()){
// do your business
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
2.公平鎖(Fair Lock)
Redisson分布式可重入公平鎖也是實現了java.util.concurrent.locks.Lock接口的一種RLock對象。在提供了自動過期解鎖功能的同時,保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。
public void testFairLock(RedissonClient redisson){
RLock fairLock = redisson.getFairLock("anyLock");
try{
// 最常見的使用方法
fairLock.lock();
// 支持過期解鎖功能, 10秒鍾以后自動解鎖,無需調用unlock方法手動解鎖
fairLock.lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
fairLock.unlock();
}
}
Redisson同時還為分布式可重入公平鎖提供了異步執行的相關方法:
RLock fairLock = redisson.getFairLock("anyLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);
3.聯鎖(MultiLock)
Redisson的RedissonMultiLock對象可以將多個RLock對象關聯為一個聯鎖,每個RLock對象實例可以來自於不同的Redisson實例。
public void testMultiLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
try {
// 同時加鎖:lock1 lock2 lock3, 所有的鎖都上鎖成功才算成功。
lock.lock();
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
4.紅鎖(RedLock)
Redisson的RedissonRedLock對象實現了Redlock介紹的加鎖算法。該對象也可以用來將多個RLock對象關聯為一個紅鎖,每個RLock對象實例可以來自於不同的Redisson實例。
public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
try {
// 同時加鎖:lock1 lock2 lock3, 紅鎖在大部分節點上加鎖成功就算成功。
lock.lock();
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
5.讀寫鎖(ReadWriteLock)
Redisson的分布式可重入讀寫鎖RReadWriteLock,Java對象實現了java.util.concurrent.locks.ReadWriteLock接口。同時還支持自動過期解鎖。該對象允許同時有多個讀取鎖,但是最多只能有一個寫入鎖。
RReadWriteLock rwlock = redisson.getLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
// 支持過期解鎖功能
// 10秒鍾以后自動解鎖
// 無需調用unlock方法手動解鎖
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
6.信號量(Semaphore)
Redisson的分布式信號量(Semaphore)Java對象RSemaphore采用了與java.util.concurrent.Semaphore相似的接口和用法。
RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();
7.可過期性信號量(PermitExpirableSemaphore)
Redisson的可過期性信號量(PermitExpirableSemaphore)實在RSemaphore對象的基礎上,為每個信號增加了一個過期時間。每個信號可以通過獨立的ID來辨識,釋放時只能通過提交這個ID才能釋放。
RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 獲取一個信號,有效期只有2秒鍾。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);
8.閉鎖/倒數閂(CountDownLatch)
Redisson的分布式閉鎖(CountDownLatch)Java對象RCountDownLatch采用了與java.util.concurrent.CountDownLatch相似的接口和用法。
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他線程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();
redis分布式鎖的高可用
關於Redis分布式鎖的高可用問題,大致如下:
在master- slave的集群架構中,就是如果你對某個redis master實例,寫入了DISLOCK這種鎖key的value,此時會異步復制給對應的master slave實例。
但是,這個過程中一旦發生redis master宕機,主備切換,redis slave變為了redis master。而此時的主從復制沒有徹底完成.....
接着就會導致,客戶端2來嘗試加鎖的時候,在新的redis master上完成了加鎖,而客戶端1也以為自己成功加了鎖。
此時就會導致多個客戶端對一個分布式鎖完成了加鎖。
這時系統在業務語義上一定會出現問題,導致臟數據的產生。
所以這個是是redis master-slave架構的主從異步復制導致的redis分布式鎖的最大缺陷:
在redis master實例宕機的時候,可能導致多個客戶端同時完成加鎖。
高可用的RedLock(紅鎖)原理
RedLock算法思想:
不能只在一個redis實例上創建鎖,應該是在多個redis實例上創建鎖,n / 2 + 1,必須在大多數redis節點上都成功創建鎖,才能算這個整體的RedLock加鎖成功,避免說僅僅在一個redis實例上加鎖而帶來的問題。
這個場景是假設有一個 redis cluster,有 5 個 redis master 實例。然后執行如下步驟獲取一把紅鎖:
- 獲取當前時間戳,單位是毫秒;
- 跟上面類似,輪流嘗試在每個 master 節點上創建鎖,過期時間較短,一般就幾十毫秒;
- 嘗試在大多數節點上建立一個鎖,比如 5 個節點就要求是 3 個節點 n / 2 + 1;
- 客戶端計算建立好鎖的時間,如果建立鎖的時間小於超時時間,就算建立成功了;
- 要是鎖建立失敗了,那么就依次之前建立過的鎖刪除;
- 只要別人建立了一把分布式鎖,你就得不斷輪詢去嘗試獲取鎖。

RedLock是基於redis實現的分布式鎖,它能夠保證以下特性:
-
互斥性:在任何時候,只能有一個客戶端能夠持有鎖;避免死鎖:
-
當客戶端拿到鎖后,即使發生了網絡分區或者客戶端宕機,也不會發生死鎖;(利用key的存活時間)
-
容錯性:只要多數節點的redis實例正常運行,就能夠對外提供服務,加鎖或者釋放鎖;
以sentinel模式架構為例,如下圖所示,有sentinel-1,sentinel-2,sentinel-3總計3個sentinel模式集群,如果要獲取分布式鎖,那么需要向這3個sentinel集群通過EVAL命令執行LUA腳本,需要3/2+1=2,即至少2個sentinel集群響應成功,才算成功的以Redlock算法獲取到分布式鎖:

高可用的紅鎖會導致性能降低
提前說明,使用redis分布式鎖,是追求高性能, 在cap理論中,追求的是 ap 而不是cp。
所以,如果追求高可用,建議使用 zookeeper分布式鎖。
redis分布式鎖可能導致的數據不一致性,建議使用業務補償的方式去彌補。所以,不太建議使用紅鎖,但是從學習的層面來說,大家還是一定要掌握的。
實現原理
Redisson中有一個MultiLock的概念,可以將多個鎖合並為一個大鎖,對一個大鎖進行統一的申請加鎖以及釋放鎖
而Redisson中實現RedLock就是基於MultiLock 去做的,接下來就具體看看對應的實現吧
RedLock使用案例
先看下官方的代碼使用:
(https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock)
RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");
RLock redLock = anyRedisson.getRedLock(lock1, lock2, lock3);
// traditional lock method
redLock.lock();
// or acquire lock and automatically unlock it after 10 seconds
redLock.lock(10, TimeUnit.SECONDS);
// or wait for lock aquisition up to 100 seconds
// and automatically unlock it after 10 seconds
boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
redLock.unlock();
}
}
這里是分別對3個redis實例加鎖,然后獲取一個最后的加鎖結果。
RedissonRedLock實現原理
上面示例中使用redLock.lock()或者tryLock()最終都是執行RedissonRedLock中方法。
RedissonRedLock 繼承自RedissonMultiLock, 實現了其中的一些方法:
public class RedissonRedLock extends RedissonMultiLock {
public RedissonRedLock(RLock... locks) {
super(locks);
}
/**
* 鎖可以失敗的次數,鎖的數量-鎖成功客戶端最小的數量
*/
@Override
protected int failedLocksLimit() {
return locks.size() - minLocksAmount(locks);
}
/**
* 鎖的數量 / 2 + 1,例如有3個客戶端加鎖,那么最少需要2個客戶端加鎖成功
*/
protected int minLocksAmount(final List<RLock> locks) {
return locks.size()/2 + 1;
}
/**
* 計算多個客戶端一起加鎖的超時時間,每個客戶端的等待時間
* remainTime默認為4.5s
*/
@Override
protected long calcLockWaitTime(long remainTime) {
return Math.max(remainTime / locks.size(), 1);
}
@Override
public void unlock() {
unlockInner(locks);
}
}
看到locks.size()/2 + 1 ,例如我們有3個客戶端實例,那么最少2個實例加鎖成功才算分布式鎖加鎖成功。
接着我們看下lock()的具體實現
RedissonMultiLock實現原理
public class RedissonMultiLock implements Lock {
final List<RLock> locks = new ArrayList<RLock>();
public RedissonMultiLock(RLock... locks) {
if (locks.length == 0) {
throw new IllegalArgumentException("Lock objects are not defined");
}
this.locks.addAll(Arrays.asList(locks));
}
public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
long newLeaseTime = -1;
if (leaseTime != -1) {
// 如果等待時間設置了,那么將等待時間 * 2
newLeaseTime = unit.toMillis(waitTime)*2;
}
// time為當前時間戳
long time = System.currentTimeMillis();
long remainTime = -1;
if (waitTime != -1) {
remainTime = unit.toMillis(waitTime);
}
// 計算鎖的等待時間,RedLock中:如果remainTime=-1,那么lockWaitTime為1
long lockWaitTime = calcLockWaitTime(remainTime);
// RedLock中failedLocksLimit即為n/2 + 1
int failedLocksLimit = failedLocksLimit();
List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
// 循環每個redis客戶端,去獲取鎖
for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
RLock lock = iterator.next();
boolean lockAcquired;
try {
// 調用tryLock方法去獲取鎖,如果獲取鎖成功,則lockAcquired=true
if (waitTime == -1 && leaseTime == -1) {
lockAcquired = lock.tryLock();
} else {
long awaitTime = Math.min(lockWaitTime, remainTime);
lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
}
} catch (Exception e) {
lockAcquired = false;
}
// 如果獲取鎖成功,將鎖加入到list集合中
if (lockAcquired) {
acquiredLocks.add(lock);
} else {
// 如果獲取鎖失敗,判斷失敗次數是否等於失敗的限制次數
// 比如,3個redis客戶端,最多只能失敗1次
// 這里locks.size = 3, 3-x=1,說明只要成功了2次就可以直接break掉循環
if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
break;
}
// 如果最大失敗次數等於0
if (failedLocksLimit == 0) {
// 釋放所有的鎖,RedLock加鎖失敗
unlockInner(acquiredLocks);
if (waitTime == -1 && leaseTime == -1) {
return false;
}
failedLocksLimit = failedLocksLimit();
acquiredLocks.clear();
// 重置迭代器 重試再次獲取鎖
while (iterator.hasPrevious()) {
iterator.previous();
}
} else {
// 失敗的限制次數減一
// 比如3個redis實例,最大的限制次數是1,如果遍歷第一個redis實例,失敗了,那么failedLocksLimit會減成0
// 如果failedLocksLimit就會走上面的if邏輯,釋放所有的鎖,然后返回false
failedLocksLimit--;
}
}
if (remainTime != -1) {
remainTime -= (System.currentTimeMillis() - time);
time = System.currentTimeMillis();
if (remainTime <= 0) {
unlockInner(acquiredLocks);
return false;
}
}
}
if (leaseTime != -1) {
List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
for (RLock rLock : acquiredLocks) {
RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
futures.add(future);
}
for (RFuture<Boolean> rFuture : futures) {
rFuture.syncUninterruptibly();
}
}
return true;
}
}
核心代碼都已經加了注釋,實現原理其實很簡單,基於RedLock思想,遍歷所有的Redis客戶端,然后依次加鎖,最后統計成功的次數來判斷是否加鎖成功。
Redis分段鎖
普通Redis分布式鎖的性能瓶頸問題
分布式鎖一旦加了之后,對同一個商品的下單請求,會導致所有下單操作,都必須對同一個商品key加分布式鎖。
假設某個場景,一個商品1分鍾6000訂單,每秒的 600個下單操作,
假設加鎖之后,釋放鎖之前,查庫存 -> 創建訂單 -> 扣減庫存,每個IO操作100ms,大概300毫秒。
具體如下圖:

可以再進行一下優化,將 創建訂單 + 扣減庫存 並發執行,將兩個100ms 減少為一個100ms,這既是空間換時間的思想,大概200毫秒。

將 創建訂單 + 扣減庫存 批量執行,減少一次IO,也是大概200毫秒。
這個優化方案,有個重要的前提,就是 訂單表和庫存表在相同的庫中,但是,這個前提條件,在數據量大+高並發的場景下,夠嗆。
package com.crazymaker.springcloud;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import org.junit.Test;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
public class CoCurrentDemo {
/**
* 使用CompletableFuture 和 CountDownLatch 進行並發回調
*/
@Test
public void testMutiCallBack() {
CountDownLatch countDownLatch = new CountDownLatch(10);
//批量異步
ExecutorService executor = ThreadUtil.getIoIntenseTargetThreadPool();
long start = System.currentTimeMillis();
for (int i = 0; i < 10; i++) {
CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
long tid = ThreadUtil.getCurThreadId();
try {
System.out.println("線程" + tid + "開始了,模擬一下遠程調用");
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
return tid;
}, executor);
future.thenAccept((tid) -> {
System.out.println("線程" + tid + "結束了");
countDownLatch.countDown();
});
}
try {
countDownLatch.await();
//輸出統計結果
float time = System.currentTimeMillis() - start;
System.out.println("所有任務已經執行完畢");
System.out.println("運行的時長為(ms):" + time);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
那么,一秒內,只能完成多少個商品的秒殺訂單的下單操作呢?
1000毫秒 / 200 =5 個訂單
如何達到每秒600個下單呢? 還是要從基礎知識里邊尋找答案?
分段加鎖的思想來源
分段加鎖的思想來源與基礎知識。
我經常在瘋狂創客圈社群里邊,對小伙伴們強調 基礎知識的重要性,反復強調, 《Java 高並發三部曲》 一定要多刷,最好刷三遍。
中 《Java 高並發核心編程 卷2》 介紹了 JUC的 LongAdder 和 ConcurrentHashMap的源碼和底層原理,他們提升性能的辦法是:
空間換時間, 分段加鎖
尤其是 LongAdder 的實現思想,可以用於 Redis分布式鎖 作為性能提升的手段,將 Redis分布式鎖 優化為 Redis分段鎖。
有關LongAdder 的系統化學習
有關LongAdder 的系統化學習,請參見 《Java 高並發核心編程 卷2》

使用Redis分段鎖提升秒殺的並發性能
回到前面的場景:
假設一個商品1分鍾6000訂單,每秒的 600個下單操作,
假設加鎖之后,釋放鎖之前,查庫存 -> 創建訂單 -> 扣減庫存,經過優化,每個IO操作100ms,大概200毫秒,一秒鍾5個訂單。
如何提高性能呢? 空間換時間
為了達到每秒600個訂單,可以將鎖分成 600 /5 =120 個段,反過來, 每個段1秒可以操作5次, 120個段,合起來,及時每秒操作600次。
進行搶奪鎖的,如果申請到一個具體的段呢?
每一次使用隨機算法,隨機到一個分段, 如果不行,就輪詢下一個分段,具體的流程,大致如下:

缺點:
這個是一個理論的時間預估,沒有扣除 嘗試下一個分段的 時間, 另外,實際上的性能, 會比理論上差,從咱們實操案例的測試結果,也可以證明這點。
實戰: 手寫一個Redis分段鎖
package com.crazymaker.springcloud.standard.lock;
import com.crazymaker.springcloud.common.util.RandomUtil;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@Slf4j
@Data
@AllArgsConstructor
public class JedisMultiSegmentLock implements Lock {
public static final int NO_SEG = -1;
//拿到鎖的線程
private Thread thread;
//拿到鎖的狀態
private volatile boolean isLocked = false;
//段數
private final int segAmount;
public static final int DEFAULT_TIMEOUT = 2000;
public static final Long WAIT_GAT = Long.valueOf(100);
//內部的鎖
InnerLock[] innerLocks = null;
//被鎖住的分段
int segmentIndexLocked = NO_SEG;
/**
* 默認為2000ms
*/
long expire = 2000L;
int segmentIndex = 0;
public JedisMultiSegmentLock(String lockKey, String requestId, int segAmount) {
this.segAmount = segAmount;
innerLocks = new InnerLock[segAmount];
for (int i = 0; i < this.segAmount; i++) {
//每一個分段,加上一個編號
String innerLockKey = lockKey + ":" + i;
innerLocks[i] = new InnerLock(innerLockKey, requestId);
}
segmentIndex = RandomUtil.randInModLower(this.segAmount);
}
/**
* 獲取一個分布式鎖 , 超時則返回失敗
*
* @return 獲鎖成功 - true | 獲鎖失敗 - false
*/
@Override
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
//本地可重入
if (isLocked && thread == Thread.currentThread()) {
return true;
}
expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT;
long startMillis = System.currentTimeMillis();
Long millisToWait = expire;
boolean localLocked = false;
int turn = 1;
InnerLock innerLock = innerLocks[segmentIndex];
while (!localLocked) {
localLocked = innerLock.lock(expire);
if (!localLocked) {
millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
startMillis = System.currentTimeMillis();
if (millisToWait > 0L) {
/**
* 還沒有超時
*/
ThreadUtil.sleepMilliSeconds(WAIT_GAT);
log.info("睡眠一下,重新開始,turn:{},剩余時間:{}", turn++, millisToWait);
segmentIndex++;
if (segmentIndex >= this.segAmount) {
segmentIndex = 0;
}
innerLock = innerLocks[segmentIndex];
} else {
log.info("搶鎖超時");
return false;
}
} else {
segmentIndexLocked = segmentIndex;
isLocked = true;
localLocked = true;
thread = Thread.currentThread();
}
}
return isLocked;
}
/**
* 搶奪鎖
*/
@Override
public void lock() {
throw new IllegalStateException(
"方法 'lock' 尚未實現!");
}
//釋放鎖
@Override
public void unlock() {
if (segmentIndexLocked == NO_SEG) {
return;
}
this.innerLocks[segmentIndexLocked].unlock();
segmentIndexLocked = NO_SEG;
thread = null;
isLocked = false;
}
@Override
public Condition newCondition() {
throw new IllegalStateException(
"方法 'newCondition' 尚未實現!");
}
@Override
public void lockInterruptibly() throws InterruptedException {
throw new IllegalStateException(
"方法 'lockInterruptibly' 尚未實現!");
}
@Override
public boolean tryLock() {
throw new IllegalStateException(
"方法 'tryLock' 尚未實現!");
}
}
尼恩的忠實建議:
強烈參照 LongAdder ,手寫一個Redis分段鎖,
這里, 還是有點復雜,但是很重要,建議大家動手干一票.
- 理論水平的提升,看看視頻、看看書,只有兩個字,就是需要:多看。
- 實戰水平的提升,只有兩個字,就是需要:多干。
手寫一個Redis分段鎖的實操,是高並發實戰的重要動手實操之一。
有關Redis分段鎖的實操的具體材料、源碼、問題,歡迎來 瘋狂創客圈社群交流。
高並發Java發燒友社群 - 瘋狂創客圈 總入口 點擊了解詳情:
文章核心內容和源碼來源
圖書:《Netty Zookeeper Redis 高並發實戰》 圖書簡介 - 瘋狂創...
參考文檔:
圖書:《Netty Zookeeper Redis 高並發實戰》 圖書簡介 - 瘋狂創...
Distributed locks with Redis
how-to-do-distributed-locking
redisson watchdog 使用和原理
zookeeper實現分布式鎖_java_腳本之家
基於Zookeeper 的分布式鎖實現 - SegmentFault 思否
分布式鎖用 Redis 還是 Zookeeper - 知乎
ZooKeeper分布式鎖的實現原理 - 菜鳥奮斗史 - 博客園
https://blog.csdn.net/men_wen/article/details/72853078







