Redis分布式鎖 (圖解-秒懂-史上最全)


文章很長,而且持續更新,建議收藏起來,慢慢讀!瘋狂創客圈總目錄 博客園版 為您奉上珍貴的學習資源 :

免費贈送 :《尼恩Java面試寶典》 持續更新+ 史上最全 + 面試必備 2000頁+ 面試必備 + 大廠必備 +漲薪必備
免費贈送 經典圖書:《Java高並發核心編程(卷1)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高並發核心編程(卷2)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:《Java高並發核心編程(卷3)加強版》 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 經典圖書:尼恩Java面試寶典 最新版 面試必備 + 大廠必備 +漲薪必備 加尼恩免費領
免費贈送 資源寶庫: Java 必備 百度網盤資源大合集 價值>10000元 加尼恩領取


下面是與redis面試相關的系列博文,建議大家系統化、系統化的學習

Redis 面試題 - 收藏版 (持續更新、吐血推薦)

Redis集群 - 圖解 - 秒懂(史上最全)

redis cluster 集群 HA 原理和實操(史上最全、面試必備)

Redis與DB的數據一致性解決方案(史上最全)

Redis 分布式鎖 (圖解-秒懂-史上最全)

跨JVM的線程安全問題

在單體的應用開發場景中,在多線程的環境下,涉及並發同步的時候,為了保證一個代碼塊在同一時間只能由一個線程訪問,我們一般可以使用synchronized語法和ReetrantLock去保證,這實際上是本地鎖的方式。

也就是說,在同一個JVM內部,大家往往采用synchronized或者Lock的方式來解決多線程間的安全問題。但在分布式集群工作的開發場景中,在JVM之間,那么就需要一種更加高級的鎖機制,來處理種跨JVM進程之間的線程安全問題.

解決方案是:使用分布式鎖

總之,對於分布式場景,我們可以使用分布式鎖,它是控制分布式系統之間互斥訪問共享資源的一種方式。

比如說在一個分布式系統中,多台機器上部署了多個服務,當客戶端一個用戶發起一個數據插入請求時,如果沒有分布式鎖機制保證,那么那多台機器上的多個服務可能進行並發插入操作,導致數據重復插入,對於某些不允許有多余數據的業務來說,這就會造成問題。而分布式鎖機制就是為了解決類似這類問題,保證多個服務之間互斥的訪問共享資源,如果一個服務搶占了分布式鎖,其他服務沒獲取到鎖,就不進行后續操作。

大致意思如下圖所示(不一定准確):

在這里插入圖片描述

說明:本文會以pdf格式持續更新,更多最新尼恩3高pdf筆記,請從下面的鏈接獲取:語雀 或者 碼雲


何為分布式鎖?

何為分布式鎖?

  • 當在分布式模型下,數據只有一份(或有限制),此時需要利用鎖的技術控制某一時刻修改數據的進程數。
  • 用一個狀態值表示鎖,對鎖的占用和釋放通過狀態值來標識。

分布式鎖的條件:

  • 互斥性。在任意時刻,只有一個客戶端能持有鎖。
  • 不會發生死鎖。即使有一個客戶端在持有鎖的期間崩潰而沒有主動解鎖,也能保證后續其他客戶端能加鎖。
  • 具有容錯性。只要大部分的 Redis 節點正常運行,客戶端就可以加鎖和解鎖。
  • 解鈴還須系鈴人。加鎖和解鎖必須是同一個客戶端,客戶端自己不能把別人加的鎖給解了。

分布式鎖的實現:

分布式鎖的實現由很多種,文件鎖、數據庫、redis等等,比較多;分布式鎖常見的多種實現方式:

  1. 數據庫悲觀鎖、
  2. 數據庫樂觀鎖;
  3. 基於Redis的分布式鎖;
  4. 基於ZooKeeper的分布式鎖。

在實踐中,還是redis做分布式鎖性能會高一些


數據庫悲觀鎖

所謂悲觀鎖,悲觀鎖是對數據被的修改持悲觀態度(認為數據在被修改的時候一定會存在並發問題),因此在整個數據處理過程中將數據鎖定。

悲觀鎖的實現,往往依靠數據庫提供的鎖機制(也只有數據庫層提供的鎖機制才能真正保證數據訪問的排他性,否則,即使在應用層中實現了加鎖機制,也無法保證外部系統不會修改數據)。

數據庫的行鎖、表鎖、排他鎖等都是悲觀鎖,這里以行鎖為例,進行介紹。以我們常用的MySQL為例,我們通過使用select...for update語句, 執行該語句后,會在表上加持行鎖,一直到事務提交,解除行鎖。

使用場景舉例:

在秒殺案例中,生成訂單和扣減庫存的操作,可以通過商品記錄的行鎖,進行保護。們通過使用select...for update語句,在查詢商品表庫存時將該條記錄加鎖,待下單減庫存完成后,再釋放鎖。

示例的SQL如下:

//0.開始事務
begin; 
	
//1.查詢出商品信息

select stockCount from seckill_good where id=1 for update;

//2.根據商品信息生成訂單

insert into seckill_order (id,good_id) values (null,1);

//3.修改商品stockCount減一

update seckill_good set stockCount=stockCount-1 where id=1;

//4.提交事務

commit;


以上,在對id = 1的記錄修改前,先通過for update的方式進行加鎖,然后再進行修改。這就是比較典型的悲觀鎖策略。

如果以上修改庫存的代碼發生並發,同一時間只有一個線程可以開啟事務並獲得id=1的鎖,其它的事務必須等本次事務提交之后才能執行。這樣我們可以保證當前的數據不會被其它事務修改。

我們使用select_for_update,另外一定要寫在事務中.

注意:要使用悲觀鎖,我們必須關閉mysql數據庫中自動提交的屬性,命令set autocommit=0;即可關閉,因為MySQL默認使用autocommit模式,也就是說,當你執行一個更新操作后,MySQL會立刻將結果進行提交。

悲觀鎖的實現,往往依靠數據庫提供的鎖機制。在數據庫中,悲觀鎖的流程如下:

  • 在對記錄進行修改前,先嘗試為該記錄加上排他鎖(exclusive locking)。
  • 如果加鎖失敗,說明該記錄正在被修改,那么當前查詢可能要等待或者拋出異常。具體響應方式由開發者根據實際需要決定。
  • 如果成功加鎖,那么就可以對記錄做修改,事務完成后就會解鎖了。
  • 其間如果有其他事務對該記錄做加鎖的操作,都要等待當前事務解鎖或直接拋出異常。

數據庫樂觀鎖

使用樂觀鎖就不需要借助數據庫的鎖機制了。

樂觀鎖的概念中其實已經闡述了他的具體實現細節:主要就是兩個步驟:沖突檢測和數據更新。其實現方式有一種比較典型的就是Compare and Swap(CAS)技術

CAS是項樂觀鎖技術,當多個線程嘗試使用CAS同時更新同一個變量時,只有其中一個線程能更新變量的值,而其它線程都失敗,失敗的線程並不會被掛起,而是被告知這次競爭中失敗,並可以再次嘗試。

CAS的實現中,在表中增加一個version字段,操作前先查詢version信息,在數據提交時檢查version字段是否被修改,如果沒有被修改則進行提交,否則認為是過期數據。

比如前面的扣減庫存問題,通過樂觀鎖可以實現如下:

//1.查詢出商品信息			
select stockCount, version from seckill_good where id=1;
			
//2.根據商品信息生成訂單
insert into seckill_order (id,good_id) values (null,1);

//3.修改商品庫存
update seckill_good set stockCount=stockCount-1, version = version+1 where id=1, version=version;

以上,我們在更新之前,先查詢一下庫存表中當前版本(version),然后在做update的時候,以version 作為一個修改條件。

當我們提交更新的時候,判斷數據庫表對應記錄的當前version與第一次取出來的version進行比對,如果數據庫表當前version與第一次取出來的version相等,則予以更新,否則認為是過期數據。

CAS 樂觀鎖有兩個問題:

(1) CAS 存在一個比較重要的問題,即ABA問題. 解決的辦法是version字段順序遞增。

(2) 樂觀鎖的方式,在高並發時,只有一個線程能執行成功,會造成大量的失敗,這給用戶的體驗顯然是很不好的。


Zookeeper分布式鎖

除了在數據庫層面加分布式鎖,通常還可以使用以下更高性能、更高可用的分布式鎖:

  • 分布式緩存(如redis)鎖
  • 分布式協調(如zookeeper)鎖

有關zookeeper分布式鎖的原理和實現,具體請參見下面的博客:
Zookeeper 分布式鎖 (圖解+秒懂+史上最全)

或者閱讀筆者的《Java高並發核心編程(卷1加強版)》

在這里插入圖片描述

說明:本文會以pdf格式持續更新,更多最新尼恩3高pdf筆記,請從下面的鏈接獲取:語雀 或者 碼雲


Redis分布式鎖

本文重點介紹Redis分布式鎖,分為兩個維度進行介紹:

(1)基於Jedis手工造輪子分布式鎖

(2)介紹Redission 分布式鎖的使用和原理。

分布式鎖一般有如下的特點:

  • 互斥性: 同一時刻只能有一個線程持有鎖
  • 可重入性: 同一節點上的同一個線程如果獲取了鎖之后能夠再次獲取鎖
  • 鎖超時:和J.U.C中的鎖一樣支持鎖超時,防止死鎖
  • 高性能和高可用: 加鎖和解鎖需要高效,同時也需要保證高可用,防止分布式鎖失效
  • 具備阻塞和非阻塞性:能夠及時從阻塞狀態中被喚醒

手工造輪子:基於Jedis 的API實現分布式鎖

我們首先講解 Jedis 普通分布式鎖實現,並且是純手工的模式,從最為基礎的Redis命令開始。

只有充分了解與分布式鎖相關的普通Redis命令,才能更好的了解高級的Redis分布式鎖的實現,因為高級的分布式鎖的實現完全基於普通Redis命令。

Redis幾種架構

Redis發展到現在,幾種常見的部署架構有:

  • 單機模式;
  • 主從模式;
  • 哨兵模式;
  • 集群模式;

從分布式鎖的角度來說, 無論是單機模式、主從模式、哨兵模式、集群模式,其原理都是類同的。 只是主從模式、哨兵模式、集群模式的更加的高可用、或者更加高並發。

所以,接下來先基於單機模式,基於Jedis手工造輪子實現自己的分布式鎖。

首先看兩個命令:

Redis分布式鎖機制,主要借助setnx和expire兩個命令完成。

setnx命令:

SETNX 是SET if Not eXists的簡寫。將 key 的值設為 value,當且僅當 key 不存在; 若給定的 key 已經存在,則 SETNX 不做任何動作。

下面為客戶端使用示例:

127.0.0.1:6379> set lock "unlock"
OK
127.0.0.1:6379> setnx lock "unlock"
(integer) 0
127.0.0.1:6379> setnx lock "lock"
(integer) 0
127.0.0.1:6379> 

expire命令:

expire命令為 key 設置生存時間,當 key 過期時(生存時間為 0 ),它會被自動刪除. 其格式為:

EXPIRE key seconds

下面為客戶端使用示例:

127.0.0.1:6379> expire lock 10
(integer) 1
127.0.0.1:6379> ttl lock
8

基於Jedis API的分布式鎖的總體流程:

通過Redis的setnx、expire命令可以實現簡單的鎖機制:

  • key不存在時創建,並設置value和過期時間,返回值為1;成功獲取到鎖;
  • 如key存在時直接返回0,搶鎖失敗;
  • 持有鎖的線程釋放鎖時,手動刪除key; 或者過期時間到,key自動刪除,鎖釋放。

線程調用setnx方法成功返回1認為加鎖成功,其他線程要等到當前線程業務操作完成釋放鎖后,才能再次調用setnx加鎖成功。

在這里插入圖片描述

以上簡單redis分布式鎖的問題:

如果出現了這么一個問題:如果setnx是成功的,但是expire設置失敗,一旦出現了釋放鎖失敗,或者沒有手工釋放,那么這個鎖永遠被占用,其他線程永遠也搶不到鎖。

所以,需要保障setnx和expire兩個操作的原子性,要么全部執行,要么全部不執行,二者不能分開。

解決的辦法有兩種:

  • 使用set的命令時,同時設置過期時間,不再單獨使用 expire命令
  • 使用lua腳本,將加鎖的命令放在lua腳本中原子性的執行

簡單加鎖:使用set的命令時,同時設置過期時間

使用set的命令時,同時設置過期時間的示例如下:

127.0.0.1:6379> set unlock "234" EX 100 NX
(nil)
127.0.0.1:6379> 
127.0.0.1:6379> set test "111" EX 100 NX
OK

這樣就完美的解決了分布式鎖的原子性; set 命令的完整格式:

set key value [EX seconds] [PX milliseconds] [NX|XX]

EX seconds:設置失效時長,單位秒
PX milliseconds:設置失效時長,單位毫秒
NX:key不存在時設置value,成功返回OK,失敗返回(nil)
XX:key存在時設置value,成功返回OK,失敗返回(nil)

使用set命令實現加鎖操作,先展示加鎖的簡單代碼實習,再帶大家慢慢解釋為什么這樣實現。

加鎖的簡單代碼實現

package com.crazymaker.springcloud.standard.lock;

@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {

    private  RedisTemplate redisTemplate;

    private static final String LOCK_SUCCESS = "OK";
    private static final String SET_IF_NOT_EXIST = "NX";
    private static final String SET_WITH_EXPIRE_TIME = "PX";

    /**
     * 嘗試獲取分布式鎖
     * @param jedis Redis客戶端
     * @param lockKey 鎖
     * @param requestId 請求標識
     * @param expireTime 超期時間
     * @return 是否獲取成功
     */
    public static   boolean tryGetDistributedLock(Jedis jedis, String lockKey, String requestId, int expireTime) {

        String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);

        if (LOCK_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }
}

可以看到,我們加鎖用到了Jedis的set Api

jedis.set(String key, String value, String nxxx, String expx, int time)

這個set()方法一共有五個形參:

  • 第一個為key,我們使用key來當鎖,因為key是唯一的。

  • 第二個為value,我們傳的是requestId,很多童鞋可能不明白,有key作為鎖不就夠了嗎,為什么還要用到value?原因就是我們在上面講到可靠性時,分布式鎖要滿足第四個條件解鈴還須系鈴人,通過給value賦值為requestId,我們就知道這把鎖是哪個請求加的了,在解鎖的時候就可以有依據。

    requestId可以使用UUID.randomUUID().toString()方法生成。

  • 第三個為nxxx,這個參數我們填的是NX,意思是SET IF NOT EXIST,即當key不存在時,我們進行set操作;若key已經存在,則不做任何操作;

  • 第四個為expx,這個參數我們傳的是PX,意思是我們要給這個key加一個過期的設置,具體時間由第五個參數決定。

  • 第五個為time,與第四個參數相呼應,代表key的過期時間。

總的來說,執行上面的set()方法就只會導致兩種結果:

  1. 當前沒有鎖(key不存在),那么就進行加鎖操作,並對鎖設置個有效期,同時value表示加鎖的客戶端。
  2. 已有鎖存在,不做任何操作。

心細的童鞋就會發現了,我們的加鎖代碼滿足前面描述的四個條件中的三個。

  • 首先,set()加入了NX參數,可以保證如果已有key存在,則函數不會調用成功,也就是只有一個客戶端能持有鎖,滿足互斥性。

  • 其次,由於我們對鎖設置了過期時間,即使鎖的持有者后續發生崩潰而沒有解鎖,鎖也會因為到了過期時間而自動解鎖(即key被刪除),不會被永遠占用(而發生死鎖)。

  • 最后,因為我們將value賦值為requestId,代表加鎖的客戶端請求標識,那么在客戶端在解鎖的時候就可以進行校驗是否是同一個客戶端。

  • 由於我們只考慮Redis單機部署的場景,所以容錯性我們暫不考慮。

基於Jedis 的API實現簡單解鎖代碼

還是先展示代碼,再帶大家慢慢解釋為什么這樣實現。

解鎖的簡單代碼實現

package com.crazymaker.springcloud.standard.lock;

@Slf4j
@Data
@AllArgsConstructor
public class JedisCommandLock {

    private static final Long RELEASE_SUCCESS = 1L;

    /**
     * 釋放分布式鎖
     * @param jedis Redis客戶端
     * @param lockKey 鎖
     * @param requestId 請求標識
     * @return 是否釋放成功
     */
    public static boolean releaseDistributedLock(Jedis jedis, String lockKey, String requestId) {

        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

        if (RELEASE_SUCCESS.equals(result)) {
            return true;
        }
        return false;
    }
}

那么這段Lua代碼的功能是什么呢?

其實很簡單,首先獲取鎖對應的value值,檢查是否與requestId相等,如果相等則刪除鎖(解鎖)。

第一行代碼,我們寫了一個簡單的Lua腳本代碼。

第二行代碼,我們將Lua代碼傳到jedis.eval()方法里,並使參數KEYS[1]賦值為lockKey,ARGV[1]賦值為requestId。eval()方法是將Lua代碼交給Redis服務端執行。

那么為什么要使用Lua語言來實現呢?

因為要確保上述操作是原子性的。那么為什么執行eval()方法可以確保原子性,源於Redis的特性.

簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,並且直到eval命令執行完成,Redis才會執行其他命

錯誤示例1

最常見的解鎖代碼就是直接使用 jedis.del() 方法刪除鎖,這種不先判斷鎖的擁有者而直接解鎖的方式,會導致任何客戶端都可以隨時進行解鎖,即使這把鎖不是它的。

public static void wrongReleaseLock1(Jedis jedis, String lockKey) {
    jedis.del(lockKey);
}

錯誤示例2

這種解鎖代碼乍一看也是沒問題,甚至我之前也差點這樣實現,與正確姿勢差不多,唯一區別的是分成兩條命令去執行,代碼如下:

public static void wrongReleaseLock2(Jedis jedis, String lockKey, String requestId) {
        
    // 判斷加鎖與解鎖是不是同一個客戶端
    if (requestId.equals(jedis.get(lockKey))) {
        // 若在此時,這把鎖突然不是這個客戶端的,則會誤解鎖
        jedis.del(lockKey);
    }

}

再造輪子:基於Lua腳本實現分布式鎖

lua腳本的好處

前面提到,在redis中執行lua腳本,有如下的好處:

那么為什么要使用Lua語言來實現呢?

因為要確保上述操作是原子性的。那么為什么執行eval()方法可以確保原子性,源於Redis的特性.

簡單來說,就是在eval命令執行Lua代碼的時候,Lua代碼將被當成一個命令去執行,並且直到eval命令執行完成,Redis才會執行其他命

所以:

大部分的開源框架(如 redission)中的分布式鎖組件,都是用純lua腳本實現的。

題外話: lua腳本是高並發、高性能的必備腳本語言

有關lua的詳細介紹,請參見以下書籍:

在這里插入圖片描述

那么,我們也來模擬一下

基於純Lua腳本的分布式鎖的執行流程

加鎖和刪除鎖的操作,使用純lua進行封裝,保障其執行時候的原子性。

基於純Lua腳本實現分布式鎖的執行流程,大致如下:

在這里插入圖片描述

加鎖的Lua腳本: lock.lua

--- -1 failed
--- 1 success
---
local key = KEYS[1]
local requestId = KEYS[2]
local ttl = tonumber(KEYS[3])
local result = redis.call('setnx', key, requestId)
if result == 1 then
    --PEXPIRE:以毫秒的形式指定過期時間
    redis.call('pexpire', key, ttl)
else
    result = -1;
    -- 如果value相同,則認為是同一個線程的請求,則認為重入鎖
    local value = redis.call('get', key)
    if (value == requestId) then
        result = 1;
        redis.call('pexpire', key, ttl)
    end
end
--  如果獲取鎖成功,則返回 1
return result

解鎖的Lua腳本: unlock.lua:

--- -1 failed
--- 1 success

-- unlock key
local key = KEYS[1]
local requestId = KEYS[2]
local value = redis.call('get', key)
if value == requestId then
    redis.call('del', key);
    return 1;
end
return -1

兩個文件,放在資源文件夾下備用:

在這里插入圖片描述

在Java中調用lua腳本,完成加鎖操作

package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.IOUtil;
import com.crazymaker.springcloud.standard.context.SpringContextUtil;
import com.crazymaker.springcloud.standard.lua.ScriptHolder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisCallback;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.ArrayList;
import java.util.List;

@Slf4j
public class InnerLock {

    private RedisTemplate redisTemplate;


    public static final Long LOCKED = Long.valueOf(1);
    public static final Long UNLOCKED = Long.valueOf(1);
    public static final int EXPIRE = 2000;

    String key;
    String requestId;  // lockValue 鎖的value ,代表線程的uuid

    /**
     * 默認為2000ms
     */
    long expire = 2000L;


    private volatile boolean isLocked = false;
    private RedisScript lockScript;
    private RedisScript unLockScript;


    public InnerLock(String lockKey, String requestId) {
        this.key = lockKey;
        this.requestId = requestId;
        lockScript = ScriptHolder.getLockScript();
        unLockScript = ScriptHolder.getUnlockScript();
    }

    /**
     * 搶奪鎖
     */
    public void lock() {
        if (null == key) {
            return;
        }
        try {
            List<String> redisKeys = new ArrayList<>();
            redisKeys.add(key);
            redisKeys.add(requestId);
            redisKeys.add(String.valueOf(expire));

            Long res = (Long) getRedisTemplate().execute(lockScript, redisKeys);
            isLocked = false;
        } catch (Exception e) {
            e.printStackTrace();
            throw BusinessException.builder().errMsg("搶鎖失敗").build();
        }
    }

    /**
     * 有返回值的搶奪鎖
     *
     * @param millisToWait
     */
    public boolean lock(Long millisToWait) {
        if (null == key) {
            return false;
        }
        try {
            List<String> redisKeys = new ArrayList<>();
            redisKeys.add(key);
            redisKeys.add(requestId);
            redisKeys.add(String.valueOf(millisToWait));
            Long res = (Long) getRedisTemplate().execute(lockScript, redisKeys);

            return res != null && res.equals(LOCKED);
        } catch (Exception e) {
            e.printStackTrace();
            throw BusinessException.builder().errMsg("搶鎖失敗").build();
        }

    }

    //釋放鎖
    public void unlock() {
        if (key == null || requestId == null) {
            return;
        }
        try {
            List<String> redisKeys = new ArrayList<>();
            redisKeys.add(key);
            redisKeys.add(requestId);
            Long res = (Long) getRedisTemplate().execute(unLockScript, redisKeys);

//            boolean unlocked = res != null && res.equals(UNLOCKED);


        } catch (Exception e) {
            e.printStackTrace();
            throw BusinessException.builder().errMsg("釋放鎖失敗").build();

        }
    }

    private RedisTemplate getRedisTemplate() {
        if(null==redisTemplate)
        {
            redisTemplate= (RedisTemplate) SpringContextUtil.getBean("stringRedisTemplate");
        }
        return redisTemplate;
    }
}

在Java中調用lua腳本,完成加鎖操作

下一步,實現Lock接口, 完成JedisLock的分布式鎖。

其加鎖操作,通過調用 lock.lua腳本完成,代碼如下:

package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {

    private RedisTemplate redisTemplate;

    RedisScript<Long> lockScript = null;
    RedisScript<Long> unLockScript = null;

    public static final int DEFAULT_TIMEOUT = 2000;
    public static final Long LOCKED = Long.valueOf(1);
    public static final Long UNLOCKED = Long.valueOf(1);
    public static final Long WAIT_GAT = Long.valueOf(200);
    public static final int EXPIRE = 2000;


    String key;
    String lockValue;  // lockValue 鎖的value ,代表線程的uuid

    /**
     * 默認為2000ms
     */
    long expire = 2000L;

    public JedisLock(String lockKey, String lockValue) {
        this.key = lockKey;
        this.lockValue = lockValue;
    }

    private volatile boolean isLocked = false;

    private Thread thread;

    /**
     * 獲取一個分布式鎖 , 超時則返回失敗
     *
     * @return 獲鎖成功 - true | 獲鎖失敗 - false
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

        //本地可重入
        if (isLocked && thread == Thread.currentThread()) {
            return true;
        }
        expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT;
        long startMillis = System.currentTimeMillis();
        Long millisToWait = expire;

        boolean localLocked = false;

        int turn = 1;
        while (!localLocked) {

            localLocked = this.lockInner(expire);
            if (!localLocked) {
                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                startMillis = System.currentTimeMillis();
                if (millisToWait > 0L) {
                    /**
                     * 還沒有超時
                     */
                    ThreadUtil.sleepMilliSeconds(WAIT_GAT);
                    log.info("睡眠一下,重新開始,turn:{},剩余時間:{}", turn++, millisToWait);
                } else {
                    log.info("搶鎖超時");
                    return false;
                }
            } else {
                isLocked = true;
                localLocked = true;
            }
        }
        return isLocked;
    }


  
    /**
     * 有返回值的搶奪鎖
     *
     * @param millisToWait
     */
    public boolean lockInner(Long millisToWait) {
        if (null == key) {
            return false;
        }
        try {
            List<String> redisKeys = new ArrayList<>();
            redisKeys.add(key);
            redisKeys.add(lockValue);
            redisKeys.add(String.valueOf(millisToWait));
            Long res = (Long) redisTemplate.execute(lockScript, redisKeys);

            return res != null && res.equals(LOCKED);
        } catch (Exception e) {
            e.printStackTrace();
            throw BusinessException.builder().errMsg("搶鎖失敗").build();
        }

    }

   
}

實現JUC的Lock顯示鎖接口,實現一個簡單的分布式鎖

其解鎖操作,通過調用unlock.lua腳本完成,代碼如下:

package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.exception.BusinessException;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Slf4j
@Data
@AllArgsConstructor
public class JedisLock implements Lock {

    private RedisTemplate redisTemplate;

    RedisScript<Long> lockScript = null;
    RedisScript<Long> unLockScript = null;

    //釋放鎖
    @Override
    public void unlock() {
        if (key == null || requestId == null) {
            return;
        }
        try {
            List<String> redisKeys = new ArrayList<>();
            redisKeys.add(key);
            redisKeys.add(requestId);
            Long res = (Long) redisTemplate.execute(unLockScript, redisKeys);

        } catch (Exception e) {
            e.printStackTrace();
            throw BusinessException.builder().errMsg("釋放鎖失敗").build();

        }
    }
  
   
}

編寫RedisLockService用於管理JedisLock

編寫個分布式鎖服務,用於加載lua腳本,創建 分布式鎖,代碼如下:

package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.util.IOUtil;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.data.redis.core.script.RedisScript;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

@Slf4j
@Data
public class RedisLockService
{

    private RedisTemplate redisTemplate;

    static String lockLua = "script/lock.lua";
    static String unLockLua = "script/unlock.lua";
    static RedisScript<Long> lockScript = null;
    static RedisScript<Long> unLockScript = null;
    {
        String script = IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),lockLua);
//        String script = FileUtil.readString(lockLua, Charset.forName("UTF-8" ));
        if(StringUtils.isEmpty(script))
        {
            log.error("lua load failed:"+lockLua);
        }

        lockScript = new DefaultRedisScript<>(script, Long.class);

//        script = FileUtil.readString(unLockLua, Charset.forName("UTF-8" ));
        script =  IOUtil.loadJarFile(RedisLockService.class.getClassLoader(),unLockLua);
        if(StringUtils.isEmpty(script))
        {
            log.error("lua load failed:"+unLockLua);
        }
        unLockScript = new DefaultRedisScript<>(script, Long.class);

    }

    public RedisLockService(RedisTemplate redisTemplate)
    {
        this.redisTemplate = redisTemplate;
    }


    public Lock getLock(String lockKey, String lockValue) {
        JedisLock lock=new JedisLock(lockKey,lockValue);
        lock.setRedisTemplate(redisTemplate);
        lock.setLockScript(lockScript);
        lock.setUnLockScript(unLockScript);
        return lock;
    }
}

測試用例

接下來,終於可以上測試用例了

package com.crazymaker.springcloud.lock;

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = {DemoCloudApplication.class})
// 指定啟動類
public class RedisLockTest {

    @Resource
    RedisLockService redisLockService;

    private ExecutorService pool = Executors.newFixedThreadPool(10);

    @Test
    public void testLock() {
        int threads = 10;
        final int[] count = {0};
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                String lockValue = UUID.randomUUID().toString();

                try {
                    Lock lock = redisLockService.getLock("test:lock:1", lockValue);
                    boolean locked = lock.tryLock(10, TimeUnit.SECONDS);

                    if (locked) {
                        for (int j = 0; j < 1000; j++) {
                            count[0]++;
                        }

                        log.info("count = " + count[0]);
                        lock.unlock();
                    } else {
                        System.out.println("搶鎖失敗");
                    }


                } catch (Exception e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("10個線程每個累加1000為: = " + count[0]);
        //輸出統計結果
        float time = System.currentTimeMillis() - start;

        System.out.println("運行的時長為(ms):" + time);
        System.out.println("每一次執行的時長為(ms):" + time / count[0]);

    }

}

執行用例,結果如下:

2021-05-04 23:02:11.900  INFO 22120 --- [pool-1-thread-7] c.c.springcloud.lock.RedisLockTest       LN:50 count = 6000
2021-05-04 23:02:11.901  INFO 22120 --- [pool-1-thread-1] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:11.902  INFO 22120 --- [pool-1-thread-1] c.c.springcloud.lock.RedisLockTest       LN:50 count = 7000
2021-05-04 23:02:12.100  INFO 22120 --- [pool-1-thread-4] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新開始,turn:3,剩余時間:9586
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-8] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新開始,turn:3,剩余時間:9585
2021-05-04 23:02:12.101  INFO 22120 --- [pool-1-thread-4] c.c.springcloud.lock.RedisLockTest       LN:50 count = 8000
2021-05-04 23:02:12.102  INFO 22120 --- [pool-1-thread-8] c.c.springcloud.lock.RedisLockTest       LN:50 count = 9000
2021-05-04 23:02:12.304  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.standard.lock.JedisLock  LN:81 睡眠一下,重新開始,turn:4,剩余時間:9383
2021-05-04 23:02:12.307  INFO 22120 --- [pool-1-thread-5] c.c.springcloud.lock.RedisLockTest       LN:50 count = 10000
10個線程每個累加1000為: = 10000
運行的時長為(ms):827.0
每一次執行的時長為(ms):0.0827

STW導致的鎖過期問題

下面有一個簡單的使用鎖的例子,在10秒內占着鎖:

  //寫數據到文件
function writeData(filename, data) {
    boolean locked = lock.tryLock(10, TimeUnit.SECONDS);
    if (!locked) {
        throw 'Failed to acquire lock';
    }

    try {
        //將數據寫到文件
        var file = storage.readFile(filename);
        var updated = updateContents(file, data);
        storage.writeFile(filename, updated);
    } finally {
        lock.unlock();
    }
}

問題是:如果在寫文件過程中,發生了 fullGC,並且其時間跨度較長, 超過了10秒, 那么,分布式就自動釋放了。

在此過程中,client2 搶到鎖,寫了文件。

client1 的fullGC完成后,也繼續寫文件,注意,此時client1 的並沒有占用鎖,此時寫入會導致文件數據錯亂,發生線程安全問題。

這就是STW導致的鎖過期問題。

STW導致的鎖過期問題,具體如下圖所示:

在這里插入圖片描述

STW導致的鎖過期問題,大概的解決方案,有:

1: 模擬CAS樂觀鎖的方式,增加版本號

2:watch dog自動延期機制

1: 模擬CAS樂觀鎖的方式,增加版本號(如下圖中的token)

c

此方案如果要實現,需要調整業務邏輯,與之配合,所以會入侵代碼。

2:watch dog自動延期機制

客戶端1加鎖的鎖key默認生存時間才30秒,如果超過了30秒,客戶端1還想一直持有這把鎖,怎么辦呢?

簡單!只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個后台線程,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那么就會不斷的延長鎖key的生存時間。

redission,采用的就是這種方案, 此方案不會入侵業務代碼。

注意:

單機版的watch dog 並不能解決 STW的過期問題, 需要分布式版本的 watch dog, 獨立的看門狗服務。

鎖刪除之后, 取消看門狗服務的 對應的key記錄, 當然,這就使得系統變得復雜, 還要保證看門狗服務的高並發、高可用、數據一致性的問題。

為啥推薦使用Redission

作為 Java 開發人員,我們若想在程序中集成 Redis,必須使用 Redis 的第三方庫。目前大家使用的最多的第三方庫是jedis。

和SpringCloud gateway一樣,Redisson也是基於Netty實現的,是更高性能的第三方庫。 所以,這里推薦大家使用Redission替代 jedis。

在使用Redission之前,建議大家先掌握Netty的知識。

推薦大家閱讀被很多小伙伴評價為史上最為易懂的NIO、Netty書籍:《Java高並發核心編程(卷1)》

在這里插入圖片描述

說明:本文會以pdf格式持續更新,更多最新尼恩3高pdf筆記,請從下面的鏈接獲取:語雀 或者 碼雲


Redisson簡介

Redisson是一個在Redis的基礎上實現的Java駐內存數據網格(In-Memory Data Grid)。它不僅提供了一系列的分布式的Java常用對象,還實現了可重入鎖(Reentrant Lock)、公平鎖(Fair Lock、聯鎖(MultiLock)、 紅鎖(RedLock)、 讀寫鎖(ReadWriteLock)等,還提供了許多分布式服務。

img

Redisson提供了使用Redis的最簡單和最便捷的方法。Redisson的宗旨是促進使用者對Redis的關注分離(Separation of Concern),從而讓使用者能夠將精力更集中地放在處理業務邏輯上。

img

Redisson與Jedis的對比

1.概況對比

Jedis是Redis的java實現的客戶端,其API提供了比較全面的的Redis命令的支持,Redisson實現了分布式和可擴展的的java數據結構,和Jedis相比,功能較為簡單,不支持字符串操作,不支持排序,事物,管道,分區等Redis特性。Redisson的宗旨是促進使用者對Redis的關注分離,從而讓使用者能夠將精力更集中的放在處理業務邏輯上。

2.可伸縮性

Jedis使用阻塞的I/O,且其方法調用都是同步的,程序流程要等到sockets處理完I/O才能執行,不支持異步,Jedis客戶端實例不是線程安全的,所以需要通過連接池來使用Jedis。

Redisson使用非阻塞的I/O和基於Netty框架的事件驅動的通信層,其方法調用時異步的。Redisson的API是線程安全的,所以操作單個Redisson連接來完成各種操作。

3.第三方框架整合

Redisson在Redis的基礎上實現了java緩存標准規范;Redisson還提供了Spring Session回話管理器的實現。

Redission 的源碼地址:

官網: https://redisson.org/

github: https://github.com/redisson/redisson#quick-start

特性 & 功能:

  • 支持 Redis 單節點(single)模式、哨兵(sentinel)模式、主從(Master/Slave)模式以及集群(Redis Cluster)模式

  • 程序接口調用方式采用異步執行和異步流執行兩種方式

  • 數據序列化,Redisson 的對象編碼類是用於將對象進行序列化和反序列化,以實現對該對象在 Redis 里的讀取和存儲

  • 單個集合數據分片,在集群模式下,Redisson 為單個 Redis 集合類型提供了自動分片的功能

  • 提供多種分布式對象,如:Object Bucket,Bitset,AtomicLong,Bloom Filter 和 HyperLogLog 等

  • 提供豐富的分布式集合,如:Map,Multimap,Set,SortedSet,List,Deque,Queue 等

  • 分布式鎖和同步器的實現,可重入鎖(Reentrant Lock),公平鎖(Fair Lock),聯鎖(MultiLock),紅鎖(Red Lock),信號量(Semaphonre),可過期性信號鎖(PermitExpirableSemaphore)等

  • 提供先進的分布式服務,如分布式遠程服務(Remote Service),分布式實時對象(Live Object)服務,分布式執行服務(Executor Service),分布式調度任務服務(Schedule Service)和分布式映射歸納服務(MapReduce)

說明:本文會以pdf格式持續更新,更多最新尼恩3高pdf筆記,請從下面的鏈接獲取:語雀 或者 碼雲


Redisson的使用

如何安裝 Redisson

安裝 Redisson 最便捷的方法是使用 Maven 或者 Gradle:

•Maven

<dependency>	
    <groupId>org.redisson</groupId>	
    <artifactId>redisson</artifactId>	
    <version>3.11.4</version>	
</dependency>

•Gradle

compile group: 'org.redisson', name: 'redisson', version: '3.11.4'

目前 Redisson 最新版是 3.11.4,當然你也可以通過搜索 Maven 中央倉庫 mvnrepository[1] 來找到 Redisson 的各種版本。

獲取RedissonClient對象

RedissonClient有多種模式,主要的模式有:

  • 單節點模式

  • 哨兵模式

  • 主從模式

  • 集群模式

首先介紹單節點模式。

單節點模式的程序化配置方法,大致如下:

Config config = new Config();
config.useSingleServer().setAddress("redis://myredisserver:6379");
RedissonClient redisson = Redisson.create(config);xxxxxxxxxx Config config = new Config();config.useSingleServer().setAddress("redis://myredisserver:6379");RedissonClient redisson = Redisson.create(config);// connects to 127.0.0.1:6379 by defaultRedissonClient redisson = Redisson.create();
SingleServerConfig singleConfig = config.useSingleServer();

SingleServerConfig類的設置參數如下

address(節點地址)

可以通過host:port的格式來指定節點地址。

subscriptionConnectionMinimumIdleSize(發布和訂閱連接的最小空閑連接數)

默認值:1

用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。

subscriptionConnectionPoolSize(發布和訂閱連接池大小)

默認值:50

用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

connectionMinimumIdleSize(最小空閑連接數)

默認值:32

最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。

connectionPoolSize(連接池大小)

默認值:64

連接池最大容量。連接池的連接數量自動彈性伸縮。

dnsMonitoring(是否啟用DNS監測)

默認值:false

在啟用該功能以后,Redisson將會監測DNS的變化情況。

dnsMonitoringInterval(DNS監測時間間隔,單位:毫秒)

默認值:5000

監測DNS的變化情況的時間間隔。

idleConnectionTimeout(連接空閑超時,單位:毫秒)

默認值:10000

如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。

connectTimeout(連接超時,單位:毫秒)

默認值:10000

同節點建立連接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回復命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)

默認值:3

如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(重新連接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。

database(數據庫編號)

默認值:0

嘗試連接的數據庫編號。

password(密碼)

默認值:null

用於節點身份驗證的密碼。

subscriptionsPerConnection(單個連接最大訂閱數量)

默認值:5

每個連接的最大訂閱數量。

clientName(客戶端名稱)

默認值:null

在Redis節點里顯示的客戶端名稱。

sslEnableEndpointIdentification(啟用SSL終端識別)

默認值:true

開啟SSL終端識別能力。

sslProvider(SSL實現方式)

默認值:JDK

確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼。

SpringBoot整合Redisson

Redisson有多種模式,首先介紹單機模式的整合。

一、導入Maven依賴

<!-- redisson-springboot -->
   <dependency>
       <groupId>org.redisson</groupId>
       <artifactId>redisson-spring-boot-starter</artifactId>
       <version>3.11.4</version>
   </dependency>

二、核心配置文件

spring:
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    timeout: 5000

三、添加配置類

RedissonConfig.java

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.data.redis.RedisProperties;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class RedissonConfig {

    @Autowired
    private RedisProperties redisProperties;

    @Bean
    public RedissonClient redissonClient() {
        Config config = new Config();
        String redisUrl = String.format("redis://%s:%s", redisProperties.getHost() + "", redisProperties.getPort() + "");
        config.useSingleServer().setAddress(redisUrl).setPassword(redisProperties.getPassword());
        config.useSingleServer().setDatabase(3);
        return Redisson.create(config);
    }

}

自定義starter

由於redission可以有多種模式,處於學習的目的,將多種模式封裝成一個start,可以學習一下starter的制作。

在這里插入圖片描述

封裝一個RedissonManager,通過策略模式,根據不同的配置類型,創建 RedissionConfig實例,然后創建RedissionClient對象。

在這里插入圖片描述

使用RBucket操作分布式對象

Redission模擬了Java的面向對象編程思想,可以簡單理解為一切皆為對象。

每一個 Redisson 對象 實現了RObject and RExpirable 兩個interfaces.

Usage example:

RObject object = redisson.get...()

object.sizeInMemory();

object.delete();

object.rename("newname");

object.isExists();

// catch expired event
object.addListener(new ExpiredObjectListener() {
   ...
});

// catch delete event
object.addListener(new DeletedObjectListener() {
   ...
});

每一個Redisson 對象的名字,就是 Redis中的 Key.

RMap map = redisson.getMap("mymap");
map.getName(); // = mymap

可以通過 RKeys 接口操作Redis中的keys.

Usage example:

RKeys keys = redisson.getKeys();

Iterable<String> allKeys = keys.getKeys();

Iterable<String> foundedKeys = keys.getKeysByPattern('key*');

long numOfDeletedKeys = keys.delete("obj1", "obj2", "obj3");

long deletedKeysAmount = keys.deleteByPattern("test?");

String randomKey = keys.randomKey();

long keysAmount = keys.count();

keys.flushall();

keys.flushdb();

Redisson通過RBucket接口代表可以訪問任何類型的基礎對象,或者普通對象

RBucket有一系列的工具方法,如compareAndSet(),get(),getAndDelete(),getAndSet(),set(),size(),trySet()等等,用於設值/取值/獲取尺寸。

RBucket普通對象的最大大小,為512兆字節

RBucket<AnyObject> bucket = redisson.getBucket("anyObject");

bucket.set(new AnyObject(1));
AnyObject obj = bucket.get();

bucket.trySet(new AnyObject(3));
bucket.compareAndSet(new AnyObject(4), new AnyObject(5));
bucket.getAndSet(new AnyObject(6));

下面是一個完整的實例:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

   @Test
    public void testRBucketExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient client = redissonManager.getRedisson();
        // RList 繼承了 java.util.List 接口
        RBucket<String> rstring  = client.getBucket("redission:test:bucket:string");
        rstring.set("this is a string");

        RBucket<UserDTO> ruser  = client.getBucket("redission:test:bucket:user");
        UserDTO dto = new UserDTO();
        dto.setToken(UUID.randomUUID().toString());
        ruser.set(dto);
        System.out.println("string is: " + rstring.get());
        System.out.println("dto is: " + ruser.get());

        client.shutdown();
    }


}

運行上面的代碼時,可以獲得以下輸出:

string is: this is a string
dto is: UserDTO(id=null, userId=null, username=null, password=null, nickname=null, token=183b6eeb-65a8-4b2a-80c6-cf17c08332ce, createTime=null, updateTime=null, headImgUrl=null, mobile=null, sex=null, enabled=null, type=null, openId=null, isDel=false)

在這里插入圖片描述

使用 RList 操作 Redis 列表

下面的代碼簡單演示了如何在 Redisson 中使用 RList 對象。RList 是 Java 的 List 集合的分布式並發實現。

考慮以下代碼:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testListExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient client = redissonManager.getRedisson();
        // RList 繼承了 java.util.List 接口
        RList<String> nameList = client.getList("redission:test:nameList");
        nameList.clear();
        nameList.add("張三");
        nameList.add("李四");
        nameList.add("王五");
        nameList.remove(-1);

        System.out.println("List size: " + nameList.size());


        boolean contains = nameList.contains("李四");
        System.out.println("Is list contains name '李四': " + contains);
        nameList.forEach(System.out::println);

        client.shutdown();
    }


}

運行上面的代碼時,可以獲得以下輸出:

List size: 2
Is list contains name '李四': true
張三
李四

在這里插入圖片描述

使用 RMap 操作 Redis 哈希

Redisson 還包括 RMap,它是 Java Map 集合的分布式並發實現,考慮以下代碼:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testListExamples() {
         // 默認連接上 127.0.0.1:6379
        RedissonClient client = redissonManager.getRedisson();
        // RMap 繼承了 java.util.concurrent.ConcurrentMap 接口
        RMap<String, Object> map = client.getMap("redission:test:personalMap");
        map.put("name", "張三");
        map.put("address", "北京");
        map.put("age", new Integer(50));

        System.out.println("Map size: " + map.size());

        boolean contains = map.containsKey("age");
        System.out.println("Is map contains key 'age': " + contains);
        String value = String.valueOf(map.get("name"));
        System.out.println("Value mapped by key 'name': " + value);

        client.shutdown();
    }


}

運行上面的代碼時,將會看到以下輸出:

Map size: 3
Is map contains key 'age': true
Value mapped by key 'name': 張三

在這里插入圖片描述

執行 Lua腳本

Lua是一種開源、簡單易學、輕量小巧的腳本語言,用標准C語言編寫。

其設計的目的就是為了嵌入應用程序中,從而為應用程序提供靈活的擴展和定制功能。

Redis從2.6版本開始支持Lua腳本,Redis使用Lua可以:

  1. 原子操作。Redis會將整個腳本作為一個整體執行,不會被中斷。可以用來批量更新、批量插入
  2. 減少網絡開銷。多個Redis操作合並為一個腳本,減少網絡時延
  3. 代碼復用。客戶端發送的腳本可以存儲在Redis中,其他客戶端可以根據腳本的id調用。
public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testLuaExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();

        redisson.getBucket("redission:test:foo").set("bar");
        String r = redisson.getScript().eval(RScript.Mode.READ_ONLY,
                "return redis.call('get', 'redission:test:foo')", RScript.ReturnType.VALUE);
        System.out.println("foo: " + r);

        // 通過預存的腳本進行同樣的操作
        RScript s = redisson.getScript();
        // 首先將腳本加載到Redis
        String sha1 = s.scriptLoad("return redis.call('get', 'redission:test:foo')");
        // 返回值 res == 282297a0228f48cd3fc6a55de6316f31422f5d17
        System.out.println("sha1: " + sha1);
        // 再通過SHA值調用腳本
        Future<Object> r1 = redisson.getScript().evalShaAsync(RScript.Mode.READ_ONLY,
                sha1,
                RScript.ReturnType.VALUE,
                Collections.emptyList());
        try {
            System.out.println("res: " + r1.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        client.shutdown();
    }
}

運行上面的代碼時,將會看到以下輸出:

foo: bar
sha1: 282297a0228f48cd3fc6a55de6316f31422f5d17
res: bar

在這里插入圖片描述

使用 RLock 實現 Redis 分布式鎖

RLock 是 Java 中可重入鎖的分布式實現,下面的代碼演示了 RLock 的用法:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

 @Test
    public void testLockExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        // RLock 繼承了 java.util.concurrent.locks.Lock 接口
        RLock lock = redisson.getLock("redission:test:lock:1");

        final int[] count = {0};
        int threads = 10;
        ExecutorService pool = Executors.newFixedThreadPool(10);
        CountDownLatch countDownLatch = new CountDownLatch(threads);
        long start = System.currentTimeMillis();
        for (int i = 0; i < threads; i++) {
            pool.submit(() ->
            {
                for (int j = 0; j < 1000; j++) {
                    lock.lock();

                    count[0]++;
                    lock.unlock();
                }
                countDownLatch.countDown();
            });
        }

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("10個線程每個累加1000為: = " + count[0]);
        //輸出統計結果
        float time = System.currentTimeMillis() - start;

        System.out.println("運行的時長為:" + time);
        System.out.println("每一次執行的時長為:" + time/count[0]);
    }

}

此代碼將產生以下輸出:

10個線程每個累加1000為: = 10000
運行的時長為:14172.0
每一次執行的時長為:1.4172

使用 RAtomicLong 實現 Redis 原子操作

RAtomicLong 是 Java 中 AtomicLong 類的分布式“替代品”,用於在並發環境中保存長值。以下示例代碼演示了 RAtomicLong 的用法:


public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testRAtomicLongExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
          // 線程數
        final int threads = 10;
        // 每條線程的執行輪數
        final int turns = 1000;
        ExecutorService pool = Executors.newFixedThreadPool(threads);
        for (int i = 0; i < threads; i++)
        {
            pool.submit(() ->
            {
                try
                {
                    for (int j = 0; j < turns; j++)
                    {
                        atomicLong.incrementAndGet();
                    }

                } catch (Exception e)
                {
                    e.printStackTrace();
                }
            });
        }

        ThreadUtil.sleepSeconds(5);
        System.out.println("atomicLong: " + atomicLong.get());
        redisson.shutdown();
    }

}

此代碼的輸出將是:


atomicLong: 10000

在這里插入圖片描述

整長型累加器(LongAdder)

基於Redis的Redisson分布式整長型累加器(LongAdder)采用了與java.util.concurrent.atomic.LongAdder類似的接口。通過利用客戶端內置的LongAdder對象,為分布式環境下遞增和遞減操作提供了很高得性能。據統計其性能最高比分布式AtomicLong對象快 12000 倍。

完美適用於分布式統計計量場景。下面是RLongAdder的使用案例:

RLongAdder atomicLong = redisson.getLongAdder("myLongAdder");
atomicLong.add(12);
atomicLong.increment();
atomicLong.decrement();
atomicLong.sum();

以下示例代碼演示了 RLongAdder 的用法:

public class RedissionTest {

    @Resource
    RedissonManager redissonManager;

    @Test
    public void testRAtomicLongExamples() {
        // 默認連接上 127.0.0.1:6379
        RedissonClient redisson = redissonManager.getRedisson();
        RAtomicLong atomicLong = redisson.getAtomicLong("redission:test:myLong");
          // 線程數
        final int threads = 10;
        // 每條線程的執行輪數
        final int turns = 1000;
        ExecutorService pool = Executors.newFixedThreadPool(threads);
        for (int i = 0; i < threads; i++)
        {
            pool.submit(() ->
            {
                try
                {
                    for (int j = 0; j < turns; j++)
                    {
                        atomicLong.incrementAndGet();
                    }

                } catch (Exception e)
                {
                    e.printStackTrace();
                }
            });
        }

        ThreadUtil.sleepSeconds(5);
        System.out.println("atomicLong: " + atomicLong.get());
        redisson.shutdown();
    }

}

此代碼將產生以下輸出:

longAdder: 10000
運行的時長為:5085.0
每一次執行的時長為:0.5085

當不再使用整長型累加器對象的時候應該自行手動銷毀,如果Redisson對象被關閉(shutdown)了,則不用手動銷毀。

RLongAdder atomicLong = ...
atomicLong.destroy();

序列化

Redisson的對象編碼類是用於將對象進行序列化和反序列化,以實現對該對象在Redis里的讀取和存儲。Redisson提供了以下幾種的對象編碼應用,以供大家選擇:

編碼類名稱 說明
org.redisson.codec.JsonJacksonCodec Jackson JSON 編碼 默認編碼
org.redisson.codec.AvroJacksonCodec Avro 一個二進制的JSON編碼
org.redisson.codec.SmileJacksonCodec Smile 另一個二進制的JSON編碼
org.redisson.codec.CborJacksonCodec CBOR 又一個二進制的JSON編碼
org.redisson.codec.MsgPackJacksonCodec MsgPack 再來一個二進制的JSON編碼
org.redisson.codec.IonJacksonCodec Amazon Ion 亞馬遜的Ion編碼,格式與JSON類似
org.redisson.codec.KryoCodec Kryo 二進制對象序列化編碼
org.redisson.codec.SerializationCodec JDK序列化編碼
org.redisson.codec.FstCodec FST 10倍於JDK序列化性能而且100%兼容的編碼
org.redisson.codec.LZ4Codec LZ4 壓縮型序列化對象編碼
org.redisson.codec.SnappyCodec Snappy 另一個壓縮型序列化對象編碼
org.redisson.client.codec.JsonJacksonMapCodec 基於Jackson的映射類使用的編碼。可用於避免序列化類的信息,以及用於解決使用byte[]遇到的問題。
org.redisson.client.codec.StringCodec 純字符串編碼(無轉換)
org.redisson.client.codec.LongCodec 純整長型數字編碼(無轉換)
org.redisson.client.codec.ByteArrayCodec 字節數組編碼
org.redisson.codec.CompositeCodec 用來組合多種不同編碼在一起

由Redisson默認的編碼器為二進制編碼器,為了序列化后的內容可見,需要使用Json文本序列化編碼工具類。Redisson提供了編碼器 JsonJacksonCodec,作為Json文本序列化編碼工具類。

問題是:JsonJackson在序列化有雙向引用的對象時,會出現無限循環異常。而fastjson在檢查出雙向引用后會自動用引用符$ref替換,終止循環。

所以,一些特殊場景中:用fastjson能 正常序列化到redis,而JsonJackson則拋出無限循環異常。

為了序列化后的內容可見,所以不用redission其他自帶的,自行實現fastjson編碼器:

package com.crayon.distributedredissionspringbootstarter.codec;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import org.redisson.client.codec.BaseCodec;
import org.redisson.client.protocol.Decoder;
import org.redisson.client.protocol.Encoder;

import java.io.IOException;

public class FastjsonCodec extends BaseCodec {

    private final Encoder encoder = in -> {
        ByteBuf out = ByteBufAllocator.DEFAULT.buffer();
        try {
            ByteBufOutputStream os = new ByteBufOutputStream(out);
            JSON.writeJSONString(os, in, SerializerFeature.WriteClassName);
            return os.buffer();
        } catch (IOException e) {
            out.release();
            throw e;
        } catch (Exception e) {
            out.release();
            throw new IOException(e);
        }
    };

    private final Decoder<Object> decoder = (buf, state) ->
            JSON.parseObject(new ByteBufInputStream(buf), Object.class);

    @Override
    public Decoder<Object> getValueDecoder() {
        return decoder;
    }

    @Override
    public Encoder getValueEncoder() {
        return encoder;
    }
}

替換的方法如下:


 */
@Slf4j
public class StandaloneConfigImpl implements RedissonConfigService {

    @Override
    public Config createRedissonConfig(RedissonConfig redissonConfig) {
        Config config = new Config();
        try {
            String address = redissonConfig.getAddress();
            String password = redissonConfig.getPassword();
            int database = redissonConfig.getDatabase();
            String redisAddr = GlobalConstant.REDIS_CONNECTION_PREFIX.getConstant_value() + address;
            config.useSingleServer().setAddress(redisAddr);
            config.useSingleServer().setDatabase(database);
            //密碼可以為空
            if (!StringUtils.isEmpty(password)) {
                config.useSingleServer().setPassword(password);
            }
            log.info("初始化[單機部署]方式Config,redisAddress:" + address);

//            config.setCodec( new FstCodec());
            config.setCodec( new FastjsonCodec());
        } catch (Exception e) {
            log.error("單機部署 Redisson init error", e);
        }
        return config;
    }
}

哨兵模式

哨兵模式即sentinel模式,配置Redis哨兵服務的官方文檔在這里

哨兵模式實現代碼和單機模式幾乎一樣,唯一的不同就是Config的構造.

程序化配置哨兵模式的方法如下

Config config = new Config();
config.useSentinelServers()
    .setMasterName("mymaster")
    // use "rediss://" for SSL connection
    .addSentinelAddress("redis://127.0.0.1:26389", "redis://127.0.0.1:26379")
    .addSentinelAddress("redis://127.0.0.1:26319");

RedissonClient redisson = Redisson.create(config);

Redisson的哨兵模式的使用方法如下:

SentinelServersConfig sentinelConfig = config.useSentinelServers();

SentinelServersConfig配置參數如下

配置Redis哨兵服務的官方文檔在這里。Redisson的哨兵模式的使用方法如下:SentinelServersConfig sentinelConfig = config.useSentinelServers();

SentinelServersConfig 類的設置參數如下:

dnsMonitoringInterval(DNS監控間隔,單位:毫秒)

默認值:5000

用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM里的DNS數據的緩存時間保持在足夠低的范圍才有意義。用-1來禁用該功能。

masterName(主服務器的名稱)

主服務器的名稱是哨兵進程中用來監測主從服務切換情況的。

addSentinelAddress(添加哨兵節點地址)

可以通過host:port的格式來指定哨兵節點的地址。多個節點可以一次性批量添加。

readMode(讀取操作的負載均衡模式)

默認值: SLAVE(只在從服務節點里讀取)

注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。

設置讀取操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里讀取。MASTER - 只在主服務節點里讀取。MASTER_SLAVE - 在主從服務節點里都可以讀取。

subscriptionMode(訂閱操作的負載均衡模式)

默認值:SLAVE(只在從服務節點里訂閱)

設置訂閱操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里訂閱。MASTER - 只在主服務節點里訂閱。

loadBalancer(負載均衡算法類的選擇)

默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)

默認值:1

多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。

subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)

默認值:50

多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

slaveConnectionMinimumIdleSize(從節點最小空閑連接數)

默認值:32

多從節點的環境里,每個 從服務節點里用於普通操作( 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。

slaveConnectionPoolSize(從節點連接池大小)

默認值:64

多從節點的環境里,每個 從服務節點里用於普通操作( 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

masterConnectionMinimumIdleSize(主節點最小空閑連接數)

默認值:32

多從節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。

masterConnectionPoolSize(主節點連接池大小)

默認值:64

主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。

idleConnectionTimeout(連接空閑超時,單位:毫秒)

默認值:10000

如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。

connectTimeout(連接超時,單位:毫秒)

默認值:10000

同任何節點建立連接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回復命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)

默認值:3

如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(重新連接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。

database(數據庫編號)

默認值:0

嘗試連接的數據庫編號。

password(密碼)

默認值:null

用於節點身份驗證的密碼。

subscriptionsPerConnection(單個連接最大訂閱數量)

默認值:5

每個連接的最大訂閱數量。

clientName(客戶端名稱)

默認值:null

在Redis節點里顯示的客戶端名稱。

sslEnableEndpointIdentification(啟用SSL終端識別)

默認值:true

開啟SSL終端識別能力。

sslProvider(SSL實現方式)

默認值:JDK

確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼。

通過屬性文件,配置的示例如下:

---
sentinelServersConfig:
idleConnectionTimeout: 10000
connectTimeout: 10000
timeout: 3000
retryAttempts: 3
retryInterval: 1500
failedSlaveReconnectionInterval: 3000
failedSlaveCheckInterval: 60000
password: null
subscriptionsPerConnection: 5
clientName: null
loadBalancer: !<org.redisson.connection.balancer.RoundRobinLoadBalancer> {}
subscriptionConnectionMinimumIdleSize: 1
subscriptionConnectionPoolSize: 50
slaveConnectionMinimumIdleSize: 24
slaveConnectionPoolSize: 64
masterConnectionMinimumIdleSize: 24
masterConnectionPoolSize: 64
readMode: "SLAVE"
subscriptionMode: "SLAVE"
sentinelAddresses:
  - "redis://127.0.0.1:26379"
  - "redis://127.0.0.1:26389"
masterName: "mymaster"
database: 0
threads: 16
nettyThreads: 32
codec: !<org.redisson.codec.MarshallingCodec> {}
transportMode: "NIO"

主從模式

介紹配置Redis主從服務組態的文檔在這里.

程序化配置主從模式的方法如下

Config config = new Config();
config.useMasterSlaveServers()
    // use "rediss://" for SSL connection
    .setMasterAddress("redis://127.0.0.1:6379")
    .addSlaveAddress("redis://127.0.0.1:6389", "redis://127.0.0.1:6332", "redis://127.0.0.1:6419")
    .addSlaveAddress("redis://127.0.0.1:6399");

RedissonClient redisson = Redisson.create(config);

主從模式使用到MasterSlaveServersConfig :

MasterSlaveServersConfig masterSlaveConfig = config.useMasterSlaveServers();

MasterSlaveServersConfig 類的設置參數如下:

dnsMonitoringInterval(DNS監控間隔,單位:毫秒)

默認值:5000

用來指定檢查節點DNS變化的時間間隔。使用的時候應該確保JVM里的DNS數據的緩存時間保持在足夠低的范圍才有意義。用-1來禁用該功能。

masterAddress(主節點地址)

可以通過host:port的格式來指定主節點地址。

addSlaveAddress(添加從主節點地址)

可以通過host:port的格式來指定從節點的地址。多個節點可以一次性批量添加。

readMode(讀取操作的負載均衡模式)

默認值: SLAVE(只在從服務節點里讀取)

注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。

設置讀取操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里讀取。MASTER - 只在主服務節點里讀取。MASTER_SLAVE - 在主從服務節點里都可以讀取。

subscriptionMode(訂閱操作的負載均衡模式)

默認值:SLAVE(只在從服務節點里訂閱)

設置訂閱操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里訂閱。MASTER - 只在主服務節點里訂閱。

loadBalancer(負載均衡算法類的選擇)

默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在使用多個Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)

默認值:1

多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。

subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)

默認值:50

多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

slaveConnectionMinimumIdleSize(從節點最小空閑連接數)

默認值:32

多從節點的環境里,每個 從服務節點里用於普通操作( 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。

slaveConnectionPoolSize(從節點連接池大小)

默認值:64

多從節點的環境里,每個 從服務節點里用於普通操作( 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

masterConnectionMinimumIdleSize(主節點最小空閑連接數)

默認值:32

多從節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。

masterConnectionPoolSize(主節點連接池大小)

默認值:64

主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。

idleConnectionTimeout(連接空閑超時,單位:毫秒)

默認值:10000

如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。

connectTimeout(連接超時,單位:毫秒)

默認值:10000

同任何節點建立連接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回復命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)

默認值:3

如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(重新連接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。

database(數據庫編號)

默認值:0

嘗試連接的數據庫編號。

password(密碼)

默認值:null

用於節點身份驗證的密碼。

subscriptionsPerConnection(單個連接最大訂閱數量)

默認值:5

每個連接的最大訂閱數量。

clientName(客戶端名稱)

默認值:null

在Redis節點里顯示的客戶端名稱。

sslEnableEndpointIdentification(啟用SSL終端識別)

默認值:true

開啟SSL終端識別能力。

sslProvider(SSL實現方式)

默認值:JDK

確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼。

集群模式

集群模式除了適用於Redis集群環境,也適用於任何雲計算服務商提供的集群模式,例如AWS ElastiCache集群版Azure Redis Cache阿里雲(Aliyun)的雲數據庫Redis版

介紹配置Redis集群組態的文檔在這里。 Redis集群組態的最低要求是必須有三個主節點。

集群模式構造Config如下:

Config config = new Config();
config.useClusterServers()
    .setScanInterval(2000) // 集群狀態掃描間隔時間,單位是毫秒
    //可以用"rediss://"來啟用SSL連接
    .addNodeAddress("redis://127.0.0.1:7000", "redis://127.0.0.1:7001")
    .addNodeAddress("redis://127.0.0.1:7002");
RedissonClient redisson = Redisson.create(config);

集群模式使用到ClusterServersConfig :

ClusterServersConfig clusterConfig = config.useClusterServers();

ClusterServersConfig 配置參數如下

nodeAddresses(添加節點地址)

可以通過host:port的格式來添加Redis集群節點的地址。多個節點可以一次性批量添加。

scanInterval(集群掃描間隔時間)

默認值: 1000

對Redis集群節點狀態掃描的時間間隔。單位是毫秒。

slots(分片數量)

默認值: 231用於指定數據分片過程中的分片數量。支持數據分片/框架結構有:集(Set)映射(Map)BitSetBloom filter, Spring CacheHibernate Cache等.

readMode(讀取操作的負載均衡模式)

默認值: SLAVE(只在從服務節點里讀取)

注:在從服務節點里讀取的數據說明已經至少有兩個節點保存了該數據,確保了數據的高可用性。

設置讀取操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里讀取。MASTER - 只在主服務節點里讀取。MASTER_SLAVE - 在主從服務節點里都可以讀取。

subscriptionMode(訂閱操作的負載均衡模式)

默認值:SLAVE(只在從服務節點里訂閱)

設置訂閱操作選擇節點的模式。可用值為:SLAVE - 只在從服務節點里訂閱。MASTER - 只在主服務節點里訂閱。

loadBalancer(負載均衡算法類的選擇)

默認值: org.redisson.connection.balancer.RoundRobinLoadBalancer

在多Redis服務節點的環境里,可以選用以下幾種負載均衡方式選擇一個節點:org.redisson.connection.balancer.WeightedRoundRobinBalancer - 權重輪詢調度算法org.redisson.connection.balancer.RoundRobinLoadBalancer - 輪詢調度算法org.redisson.connection.balancer.RandomLoadBalancer - 隨機調度算法

subscriptionConnectionMinimumIdleSize(從節點發布和訂閱連接的最小空閑連接數)

默認值:1

多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的最小保持連接數(長連接)。Redisson內部經常通過發布和訂閱來實現許多功能。長期保持一定數量的發布訂閱連接是必須的。

subscriptionConnectionPoolSize(從節點發布和訂閱連接池大小)

默認值:50

多從節點的環境里,每個 從服務節點里用於發布和訂閱連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

slaveConnectionMinimumIdleSize(從節點最小空閑連接數)

默認值:32

多從節點的環境里,每個 從服務節點里用於普通操作( 發布和訂閱)的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時讀取反映速度。

slaveConnectionPoolSize(從節點連接池大小)

默認值:64

多從節點的環境里,每個 從服務節點里用於普通操作( 發布和訂閱)連接的連接池最大容量。連接池的連接數量自動彈性伸縮。

masterConnectionMinimumIdleSize(主節點最小空閑連接數)

默認值:32

多節點的環境里,每個 主節點的最小保持連接數(長連接)。長期保持一定數量的連接有利於提高瞬時寫入反應速度。

masterConnectionPoolSize(主節點連接池大小)

默認值:64

多主節點的環境里,每個 主節點的連接池最大容量。連接池的連接數量自動彈性伸縮。

idleConnectionTimeout(連接空閑超時,單位:毫秒)

默認值:10000

如果當前連接池里的連接數量超過了最小空閑連接數,而同時有連接空閑時間超過了該數值,那么這些連接將會自動被關閉,並從連接池里去掉。時間單位是毫秒。

connectTimeout(連接超時,單位:毫秒)

默認值:10000

同任何節點建立連接時的等待超時。時間單位是毫秒。

timeout(命令等待超時,單位:毫秒)

默認值:3000

等待節點回復命令的時間。該時間從命令發送成功時開始計時。

retryAttempts(命令失敗重試次數)

默認值:3

如果嘗試達到 retryAttempts(命令失敗重試次數) 仍然不能將命令發送至某個指定的節點時,將拋出錯誤。如果嘗試在此限制之內發送成功,則開始啟用 timeout(命令等待超時) 計時。

retryInterval(命令重試發送時間間隔,單位:毫秒)

默認值:1500

在一條命令發送失敗以后,等待重試發送的時間間隔。時間單位是毫秒。

reconnectionTimeout(重新連接時間間隔,單位:毫秒)

默認值:3000

當與某個節點的連接斷開時,等待與其重新建立連接的時間間隔。時間單位是毫秒。

failedAttempts(執行失敗最大次數)

默認值:3

在某個節點執行相同或不同命令時,連續 失敗 failedAttempts(執行失敗最大次數) 時,該節點將被從可用節點列表里清除,直到 reconnectionTimeout(重新連接時間間隔) 超時以后再次嘗試。

password(密碼)

默認值:null

用於節點身份驗證的密碼。

subscriptionsPerConnection(單個連接最大訂閱數量)

默認值:5

每個連接的最大訂閱數量。

clientName(客戶端名稱)

默認值:null

在Redis節點里顯示的客戶端名稱。

sslEnableEndpointIdentification(啟用SSL終端識別)

默認值:true

開啟SSL終端識別能力。

sslProvider(SSL實現方式)

默認值:JDK

確定采用哪種方式(JDK或OPENSSL)來實現SSL連接。

sslTruststore(SSL信任證書庫路徑)

默認值:null

指定SSL信任證書庫的路徑。

sslTruststorePassword(SSL信任證書庫密碼)

默認值:null

指定SSL信任證書庫的密碼。

sslKeystore(SSL鑰匙庫路徑)

默認值:null

指定SSL鑰匙庫的路徑。

sslKeystorePassword(SSL鑰匙庫密碼)

默認值:null

指定SSL鑰匙庫的密碼。

說明:本文會以pdf格式持續更新,更多最新尼恩3高pdf筆記,請從下面的鏈接獲取:語雀 或者 碼雲

簡單Redision鎖的原理

Redis發展到現在,幾種常見的部署架構有:

  1. 單機模式;
  2. 哨兵模式;
  3. 集群模式;

先介紹,基於單機模式的簡單Redision鎖的使用。

簡單Redision鎖的使用

單機模式下,簡單Redision鎖的使用如下:

// 構造redisson實現分布式鎖必要的Config
Config config = new Config();
config.useSingleServer().setAddress("redis://172.29.1.180:5379").setPassword("a123456").setDatabase(0);
// 構造RedissonClient
RedissonClient redissonClient = Redisson.create(config);
// 設置鎖定資源名稱
RLock disLock = redissonClient.getLock("DISLOCK");
//嘗試獲取分布式鎖
boolean isLock= disLock.tryLock(500, 15000, TimeUnit.MILLISECONDS);
if (isLock) {
   try {
        //TODO if get lock success, do something;
        Thread.sleep(15000);

   } catch (Exception e) {
   } finally {
    // 無論如何, 最后都要解鎖
    disLock.unlock();
   }
}

通過代碼可知,經過Redisson的封裝,實現Redis分布式鎖非常方便,和顯式鎖的使用方法是一樣的。RLock接口繼承了 Lock接口。

我們再看一下Redis中的value是啥,和前文分析一樣,hash結構, redis 的key就是資源名稱。

hash結構的key就是UUID+threadId,hash結構的value就是重入值,在分布式鎖時,這個值為1(Redisson還可以實現重入鎖,那么這個值就取決於重入次數了):

172.29.1.180:5379> hgetall DISLOCK
1) "01a6d806-d282-4715-9bec-f51b9aa98110:1"
2) "1"

使用客戶端工具看到的效果如下:

在這里插入圖片描述

getLock()方法

img

可以看到,調用getLock()方法后實際返回一個RedissonLock對象

tryLock方法

下面來看下tryLock方法,源碼如下:

    @Override
    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }

        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
        
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }

                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }

                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }

以上代碼使用了異步回調模式,RFuture 繼承了 java.util.concurrent.Future , CompletionStage 兩大接口,異步回調模式的基礎知識,請參見 《Java高並發核心編程 卷2 》

在這里插入圖片描述

tryAcquire()方法

在RedissonLock對象的lock()方法主要調用tryAcquire()方法

img

tryLockInnerAsync

img

由於leaseTime == -1,於是走tryLockInnerAsync()方法,這個方法才是關鍵

img

首先,看一下evalWriteAsync方法的定義

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

這和前面的jedis調用lua腳本類似,最后兩個參數分別是keys和params。

單獨將調用的那一段摘出來看,實際調用是這樣的:

commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                  "if (redis.call('exists', KEYS[1]) == 0) then " +
                      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                      "return nil; " +
                  "end; " +
                  "return redis.call('pttl', KEYS[1]);",
                    Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));

結合上面的參數聲明,我們可以知道,這里KEYS[1]就是getName(),ARGV[2]是getLockName(threadId)

假設:

  • 前面獲取鎖時傳的name是“DISLOCK”,
  • 假設調用的線程ID是1,
  • 假設成員變量UUID類型的id是01a6d806-d282-4715-9bec-f51b9aa98110

那么KEYS[1]=DISLOCK,ARGV[2]=01a6d806-d282-4715-9bec-f51b9aa98110:1

因此,這段腳本的意思是

  1、判斷有沒有一個叫“DISLOCK”的key

  2、如果沒有,則在其下設置一個字段為“01a6d806-d282-4715-9bec-f51b9aa98110:1”,值為“1”的鍵值對 ,並設置它的過期時間

  3、如果存在,則進一步判斷“01a6d806-d282-4715-9bec-f51b9aa98110:1”是否存在,若存在,則其值加1,並重新設置過期時間

  4、返回“DISLOCK”的生存時間(毫秒)

原理:加鎖機制

這里用的數據結構是hash,hash的結構是: key 字段1 值1 字段2 值2 。。。

用在鎖這個場景下,key就表示鎖的名稱,也可以理解為臨界資源,字段就表示當前獲得鎖的線程

所有競爭這把鎖的線程都要判斷在這個key下有沒有自己線程的字段,如果沒有則不能獲得鎖,如果有,則相當於重入,字段值加1(次數)

在這里插入圖片描述

Lua腳本的詳解

為何要使用lua語言?

因為一大堆復雜的業務邏輯,可以通過封裝在lua腳本中發送給redis,保證這段復雜業務邏輯執行的原子性

在這里插入圖片描述

回顧一下evalWriteAsync方法的定義

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object ... params);

注意,其最后兩個參數分別是keys和params。

關於 lua腳本的參數解釋:

KEYS[1]代表的是你加鎖的那個key,比如說:

RLock lock = redisson.getLock("DISLOCK");

這里你自己設置了加鎖的那個鎖key就是“DISLOCK”。

ARGV[1]代表的就是鎖key的默認生存時間

調用的時候,傳遞的參數為 internalLockLeaseTime ,該值默認30秒。

ARGV[2]代表的是加鎖的客戶端的ID,類似於下面這樣:

01a6d806-d282-4715-9bec-f51b9aa98110:1

lua腳本的第一段if判斷語句,就是用“exists DISLOCK”命令判斷一下,如果你要加鎖的那個鎖key不存在的話,你就進行加鎖。

如何加鎖呢?很簡單,用下面的redis命令:

hset DISLOCK 01a6d806-d282-4715-9bec-f51b9aa98110:1 1

通過這個命令設置一個hash數據結構,這行命令執行后,會出現一個類似下面的數據結構:

DISLOCK:
    {
        8743c9c0-0795-4907-87fd-6c719a6b4586:1 1
    }

接着會執行“pexpire DISLOCK 30000”命令,設置DISLOCK這個鎖key的生存時間是30秒(默認)

鎖互斥機制

那么在這個時候,如果客戶端2來嘗試加鎖,執行了同樣的一段lua腳本,會咋樣呢?

很簡單,第一個if判斷會執行“exists DISLOCK”,發現DISLOCK 這個鎖key已經存在了。

接着第二個if判斷,判斷一下,DISLOCK鎖key的hash數據結構中,是否包含客戶端2的ID,但是明顯不是的,因為那里包含的是客戶端1的ID。

所以,客戶端2會獲取到pttl DISLOCK返回的一個數字,這個數字代表了DISLOCK 這個鎖key的剩余生存時間。比如還剩15000毫秒的生存時間。

此時客戶端2會進入一個while循環,不停的嘗試加鎖。

可重入加鎖機制

如果客戶端1都已經持有了這把鎖了,結果可重入的加鎖會怎么樣呢?

RLock lock = redisson.getLock("DISLOCK")
lock.lock();
//業務代碼
lock.lock();
//業務代碼
lock.unlock();
lock.unlock();

分析上面那段lua腳本。

第一個if判斷肯定不成立,“exists DISLOCK”會顯示鎖key已經存在了。

第二個if判斷會成立,因為DISLOCK的hash數據結構中包含的那個ID,就是客戶端1的那個ID,也就是“8743c9c0-0795-4907-87fd-6c719a6b4586:1”

此時就會執行可重入加鎖的邏輯,他會用:

incrby DISLOCK

8743c9c0-0795-4907-87fd-6c719a6b4586:1 1

通過這個命令,對客戶端1的加鎖次數,累加1。

此時DISLOCK數據結構變為下面這樣:

DISLOCK:
    {
        8743c9c0-0795-4907-87fd-6c719a6b4586:1 2
    }

釋放鎖機制

如果執行lock.unlock(),就可以釋放分布式鎖,此時的業務邏輯也是非常簡單的。

其實說白了,就是每次都對DISLOCK數據結構中的那個加鎖次數減1。

如果發現加鎖次數是0了,說明這個客戶端已經不再持有鎖了,此時就會用:

“del DISLOCK”命令,從redis里刪除這個key。

然后呢,另外的客戶端2就可以嘗試完成加鎖了。

unlock 源碼

  @Override
    public void unlock() {
        try {
            get(unlockAsync(Thread.currentThread().getId()));
        } catch (RedisException e) {
            if (e.getCause() instanceof IllegalMonitorStateException) {
                throw (IllegalMonitorStateException) e.getCause();
            } else {
                throw e;
            }
        }
        
//        Future<Void> future = unlockAsync();
//        future.awaitUninterruptibly();
//        if (future.isSuccess()) {
//            return;
//        }
//        if (future.cause() instanceof IllegalMonitorStateException) {
//            throw (IllegalMonitorStateException)future.cause();
//        }
//        throw commandExecutor.convertException(future);
    }

再深入一下,實際調用的是unlockInnerAsync方法

unlockInnerAsync方法

在這里插入圖片描述

原理:Redision 解鎖機制

上圖沒有截取完整,完整的源碼如下:

    protected RFuture<Boolean> unlockInnerAsync(long threadId) {
        return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                    "return nil;" +
                "end; " +
                "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
                "if (counter > 0) then " +
                    "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                    "return 0; " +
                "else " +
                    "redis.call('del', KEYS[1]); " +
                    "redis.call('publish', KEYS[2], ARGV[1]); " +
                    "return 1; "+
                "end; " +
                "return nil;",
                Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));

    }

我們還是假設name=DISLOCK,假設線程ID是1

同理,我們可以知道

KEYS[1]是getName(),即KEYS[1]=DISLOCK

KEYS[2]是getChannelName(),即KEYS[2]=redisson_lock__channel:{DISLOCK}

ARGV[1]是LockPubSub.unlockMessage,即ARGV[1]=0

ARGV[2]是生存時間

ARGV[3]是getLockName(threadId),即ARGV[3]=8743c9c0-0795-4907-87fd-6c719a6b4586:1

因此,上面腳本的意思是:

  1、判斷是否存在一個叫“DISLOCK”的key

  2、如果不存在,返回nil

  3、如果存在,使用Redis Hincrby 命令用於為哈希表中的字段值加上指定增量值 -1 ,代表減去1

  4、若counter >,返回空,若字段存在,則字段值減1

  5、若減完以后,counter > 0 值仍大於0,則返回0

  6、減完后,若字段值小於或等於0,則用 publish 命令廣播一條消息,廣播內容是0,並返回1;

可以猜測,廣播0表示資源可用,即通知那些等待獲取鎖的線程現在可以獲得鎖了

在這里插入圖片描述

通過redis Channel 解鎖訂閱

以上是正常情況下獲取到鎖的情況,那么當無法立即獲取到鎖的時候怎么辦呢?

再回到前面獲取鎖的位置

@Override
public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }

    //    訂閱
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    commandExecutor.syncSubscription(future);

    try {
        while (true) {
            ttl = tryAcquire(leaseTime, unit, threadId);
            // lock acquired
            if (ttl == null) {
                break;
            }

            // waiting for message
            if (ttl >= 0) {
                getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
            } else {
                getEntry(threadId).getLatch().acquire();
            }
        }
    } finally {
        unsubscribe(future, threadId);
    }
//        get(lockAsync(leaseTime, unit));
}


protected static final LockPubSub PUBSUB = new LockPubSub();

protected RFuture<RedissonLockEntry> subscribe(long threadId) {
    return PUBSUB.subscribe(getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

protected void unsubscribe(RFuture<RedissonLockEntry> future, long threadId) {
    PUBSUB.unsubscribe(future.getNow(), getEntryName(), getChannelName(), commandExecutor.getConnectionManager().getSubscribeService());
}

這里會訂閱Channel,當資源可用時可以及時知道,並搶占,防止無效的輪詢而浪費資源

這里的channel為:

redisson_lock__channel:

在這里插入圖片描述

在這里插入圖片描述

當資源可用用的時候,循環去嘗試獲取鎖,由於多個線程同時去競爭資源,所以這里用了信號量,對於同一個資源只允許一個線程獲得鎖,其它的線程阻塞

這點,有點兒類似 Zookeeper分布式鎖:

有關zookeeper分布式鎖的原理和實現,具體請參見下面的博客:
Zookeeper 分布式鎖 (圖解+秒懂+史上最全)

watch dog自動延期機制

客戶端1加鎖的鎖key默認生存時間才30秒,如果超過了30秒,客戶端1還想一直持有這把鎖,怎么辦呢?

簡單!只要客戶端1一旦加鎖成功,就會啟動一個watch dog看門狗,他是一個后台線程,會每隔10秒檢查一下,如果客戶端1還持有鎖key,那么就會不斷的延長鎖key的生存時間。

說明:本文會以pdf格式持續更新,更多最新尼恩3高pdf筆記,請從下面的鏈接獲取:語雀 或者 碼雲


使用watchDog機制實現鎖的續期

但是聰明的同學肯定會問:

有效時間設置多長,假如我的業務操作比有效時間長,我的業務代碼還沒執行完,就自動給我解鎖了,不就完蛋了嗎。

這個問題就有點棘手了,在網上也有很多討論:

第一種解決方法就是靠程序員自己去把握,預估一下業務代碼需要執行的時間,然后設置有效期時間比執行時間長一些,保證不會因為自動解鎖影響到客戶端業務代碼的執行。

但是這並不是萬全之策,比如網絡抖動這種情況是無法預測的,也有可能導致業務代碼執行的時間變長,所以並不安全。

第二種方法,使用監事狗watchDog機制實現鎖的續期。

第二種方法比較靠譜一點,而且無業務入侵。

在Redisson框架實現分布式鎖的思路,就使用watchDog機制實現鎖的續期。

當加鎖成功后,同時開啟守護線程,默認有效期是30秒,每隔10秒就會給鎖續期到30秒,只要持有鎖的客戶端沒有宕機,就能保證一直持有鎖,直到業務代碼執行完畢由客戶端自己解鎖,如果宕機了自然就在有效期失效后自動解鎖。

這里,和前面解決 JVM STW的鎖過期問題有點類似,只不過,watchDog自動續期,也沒有完全解決JVM STW的鎖過期問題。

如何徹底解決 JVM STW的鎖過期問題,可以來瘋狂創客圈的社群討論。

redisson watchdog 使用和原理

實際上,redisson加鎖的基本流程圖如下:

在這里插入圖片描述

這里專注於介紹watchdog。

首先watchdog的具體思路是 加鎖時,默認加鎖 30秒,每10秒鍾檢查一次,如果存在就重新設置 過期時間為30秒。

然后設置默認加鎖時間的參數是 lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)

官方文檔描述如下

lockWatchdogTimeout(監控鎖的看門狗超時,單位:毫秒)

默認值:30000

監控鎖的看門狗超時時間單位為毫秒。該參數只適用於分布式鎖的加鎖請求中未明確使用leaseTimeout參數的情況。如果該看門狗未使用lockWatchdogTimeout去重新調整一個分布式鎖的lockWatchdogTimeout超時,那么這個鎖將變為失效狀態。這個參數可以用來避免由Redisson客戶端節點宕機或其他原因造成死鎖的情況。

需要注意的是

1.watchDog 只有在未顯示指定加鎖時間時才會生效。(這點很重要)

2.lockWatchdogTimeout設定的時間不要太小 ,比如我之前設置的是 100毫秒,由於網絡直接導致加鎖完后,watchdog去延期時,這個key在redis中已經被刪除了。

tryAcquireAsync原理

在調用lock方法時,會最終調用到tryAcquireAsync。詳細解釋如下:

private <T> RFuture<Long> tryAcquireAsync(long waitTime, long leaseTime, TimeUnit unit, long threadId) {
    //如果指定了加鎖時間,會直接去加鎖
        if (leaseTime != -1) {
            return tryLockInnerAsync(waitTime, leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
        }
    //沒有指定加鎖時間 會先進行加鎖,並且默認時間就是 LockWatchdogTimeout的時間
    //這個是異步操作 返回RFuture 類似netty中的future
        RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(waitTime,
                                                commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(),
                                                TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
       //這里也是類似netty Future 的addListener,在future內容執行完成后執行
        ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
            if (e != null) {
                return;
            }

            // lock acquired
            if (ttlRemaining == null) {
            //這里是定時執行 當前鎖自動延期的動作
                scheduleExpirationRenewal(threadId);
            }
        });
        return ttlRemainingFuture;
    }

scheduleExpirationRenewal 中會調用renewExpiration。

renewExpiration執行延期動作

這里我們可以看到是 啟用了一個timeout定時,去執行延期動作

    private void renewExpiration() {
   
      
        Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
                if (ent == null) {
                    return;
                }
                Long threadId = ent.getFirstThreadId();
                if (threadId == null) {
                    return;
                }
                
                RFuture<Boolean> future = renewExpirationAsync(threadId);
                future.onComplete((res, e) -> {
                    if (e != null) {
                        log.error("Can't update lock " + getName() + " expiration", e);
                        return;
                    }
                    
                    if (res) {
                     	//如果 沒有報錯,就再次定時延期
                     // reschedule itself
                     
                        renewExpiration();
                    }
                });
            }
            // 這里我們可以看到定時任務 是 lockWatchdogTimeout 的1/3時間去執行 renewExpirationAsync
        }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);
        
        ee.setTimeout(task);
    }

最終 scheduleExpirationRenewal會調用到 renewExpirationAsync,

renewExpirationAsync

執行下面這段 lua腳本。他主要判斷就是 這個鎖是否在redis中存在,如果存在就進行 pexpire 延期。

   protected RFuture<Boolean> renewExpirationAsync(long threadId) {
        return evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                        "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                        "return 1; " +
                        "end; " +
                        "return 0;",
                Collections.singletonList(getName()),
                internalLockLeaseTime, getLockName(threadId));
    }

watchLog總結

1.要使 watchLog機制生效 ,lock時 不要設置 過期時間

2.watchlog的延時時間 可以由 lockWatchdogTimeout指定默認延時時間,但是不要設置太小。如100

3.watchdog 會每 lockWatchdogTimeout/3時間,去延時。

4.watchdog 通過 類似netty的 Future功能來實現異步延時

5.watchdog 最終還是通過 lua腳本來進行延時

說明:本文會以pdf格式持續更新,更多最新尼恩3高pdf筆記,請從下面的鏈接獲取:語雀 或者 碼雲

Redisson框架的分布式鎖

Redisson框架十分強大,除了前面介紹的 getLock方法獲取的分布式鎖(輸入可重入鎖的類型),還有很多其他的分布式鎖類型。

總體的Redisson框架的分布式鎖類型,大致如下:

  • 可重入鎖
  • 公平鎖
  • 聯鎖
  • 紅鎖
  • 讀寫鎖
  • 信號量
  • 可過期信號量
  • 閉鎖(/倒數閂)

1.可重入鎖(Reentrant Lock)

Redisson的分布式可重入鎖RLock Java對象實現了java.util.concurrent.locks.Lock接口,同時還支持自動過期解鎖。

public void testReentrantLock(RedissonClient redisson){
	RLock lock = redisson.getLock("anyLock");
	try{
		// 1. 最常見的使用方法
		//lock.lock();
		// 2. 支持過期解鎖功能,10秒鍾以后自動解鎖, 無需調用unlock方法手動解鎖
		//lock.lock(10, TimeUnit.SECONDS);
		// 3. 嘗試加鎖,最多等待3秒,上鎖以后10秒自動解鎖
		boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS);
		if(res){ //成功
		// do your business
		}
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

Redisson同時還為分布式鎖提供了異步執行的相關方法:

public void testAsyncReentrantLock(RedissonClient redisson){
	RLock lock = redisson.getLock("anyLock");
	try{
		lock.lockAsync();
		lock.lockAsync(10, TimeUnit.SECONDS);
		Future<Boolean> res = lock.tryLockAsync(3, 10, TimeUnit.SECONDS);
		if(res.get()){
		// do your business
		}
	} catch (InterruptedException e) {
		e.printStackTrace();
	} catch (ExecutionException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

2.公平鎖(Fair Lock)

Redisson分布式可重入公平鎖也是實現了java.util.concurrent.locks.Lock接口的一種RLock對象。在提供了自動過期解鎖功能的同時,保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。

public void testFairLock(RedissonClient redisson){
	RLock fairLock = redisson.getFairLock("anyLock");
	try{
		// 最常見的使用方法
		fairLock.lock();
		// 支持過期解鎖功能, 10秒鍾以后自動解鎖,無需調用unlock方法手動解鎖
		fairLock.lock(10, TimeUnit.SECONDS);
		// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
		boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		fairLock.unlock();
	}
}

Redisson同時還為分布式可重入公平鎖提供了異步執行的相關方法:

RLock fairLock = redisson.getFairLock("anyLock");
fairLock.lockAsync();
fairLock.lockAsync(10, TimeUnit.SECONDS);
Future<Boolean> res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

3.聯鎖(MultiLock)

Redisson的RedissonMultiLock對象可以將多個RLock對象關聯為一個聯鎖,每個RLock對象實例可以來自於不同的Redisson實例。

public void testMultiLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
	RLock lock1 = redisson1.getLock("lock1");
	RLock lock2 = redisson2.getLock("lock2");
	RLock lock3 = redisson3.getLock("lock3");
	RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3);
	try {
		// 同時加鎖:lock1 lock2 lock3, 所有的鎖都上鎖成功才算成功。
		lock.lock();
		// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
		boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

4.紅鎖(RedLock)

Redisson的RedissonRedLock對象實現了Redlock介紹的加鎖算法。該對象也可以用來將多個RLock對象關聯為一個紅鎖,每個RLock對象實例可以來自於不同的Redisson實例。

public void testRedLock(RedissonClient redisson1,RedissonClient redisson2, RedissonClient redisson3){
	RLock lock1 = redisson1.getLock("lock1");
	RLock lock2 = redisson2.getLock("lock2");
	RLock lock3 = redisson3.getLock("lock3");
	RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3);
	try {
		// 同時加鎖:lock1 lock2 lock3, 紅鎖在大部分節點上加鎖成功就算成功。
		lock.lock();
		// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
		boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
	} catch (InterruptedException e) {
		e.printStackTrace();
	} finally {
		lock.unlock();
	}
}

5.讀寫鎖(ReadWriteLock)

Redisson的分布式可重入讀寫鎖RReadWriteLock,Java對象實現了java.util.concurrent.locks.ReadWriteLock接口。同時還支持自動過期解鎖。該對象允許同時有多個讀取鎖,但是最多只能有一個寫入鎖。

RReadWriteLock rwlock = redisson.getLock("anyRWLock");
// 最常見的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
// 支持過期解鎖功能
// 10秒鍾以后自動解鎖
// 無需調用unlock方法手動解鎖
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();

6.信號量(Semaphore)

Redisson的分布式信號量(Semaphore)Java對象RSemaphore采用了與java.util.concurrent.Semaphore相似的接口和用法。

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();

7.可過期性信號量(PermitExpirableSemaphore)

Redisson的可過期性信號量(PermitExpirableSemaphore)實在RSemaphore對象的基礎上,為每個信號增加了一個過期時間。每個信號可以通過獨立的ID來辨識,釋放時只能通過提交這個ID才能釋放。

RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore");
String permitId = semaphore.acquire();
// 獲取一個信號,有效期只有2秒鍾。
String permitId = semaphore.acquire(2, TimeUnit.SECONDS);
// ...
semaphore.release(permitId);

8.閉鎖/倒數閂(CountDownLatch)

Redisson的分布式閉鎖(CountDownLatch)Java對象RCountDownLatch采用了與java.util.concurrent.CountDownLatch相似的接口和用法。

RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.trySetCount(1);
latch.await();
// 在其他線程或其他JVM里
RCountDownLatch latch = redisson.getCountDownLatch("anyCountDownLatch");
latch.countDown();

說明:本文會以pdf格式持續更新,更多最新尼恩3高pdf筆記,請從下面的鏈接獲取:語雀 或者 碼雲

redis分布式鎖的高可用

關於Redis分布式鎖的高可用問題,大致如下:

在master- slave的集群架構中,就是如果你對某個redis master實例,寫入了DISLOCK這種鎖key的value,此時會異步復制給對應的master slave實例。

但是,這個過程中一旦發生redis master宕機,主備切換,redis slave變為了redis master。而此時的主從復制沒有徹底完成.....

接着就會導致,客戶端2來嘗試加鎖的時候,在新的redis master上完成了加鎖,而客戶端1也以為自己成功加了鎖。

此時就會導致多個客戶端對一個分布式鎖完成了加鎖。

這時系統在業務語義上一定會出現問題,導致臟數據的產生。

所以這個是是redis master-slave架構的主從異步復制導致的redis分布式鎖的最大缺陷:

在redis master實例宕機的時候,可能導致多個客戶端同時完成加鎖。

高可用的RedLock(紅鎖)原理

RedLock算法思想:

不能只在一個redis實例上創建鎖,應該是在多個redis實例上創建鎖,n / 2 + 1,必須在大多數redis節點上都成功創建鎖,才能算這個整體的RedLock加鎖成功,避免說僅僅在一個redis實例上加鎖而帶來的問題。

這個場景是假設有一個 redis cluster,有 5 個 redis master 實例。然后執行如下步驟獲取一把紅鎖:

  1. 獲取當前時間戳,單位是毫秒;
  2. 跟上面類似,輪流嘗試在每個 master 節點上創建鎖,過期時間較短,一般就幾十毫秒;
  3. 嘗試在大多數節點上建立一個鎖,比如 5 個節點就要求是 3 個節點 n / 2 + 1;
  4. 客戶端計算建立好鎖的時間,如果建立鎖的時間小於超時時間,就算建立成功了;
  5. 要是鎖建立失敗了,那么就依次之前建立過的鎖刪除;
  6. 只要別人建立了一把分布式鎖,你就得不斷輪詢去嘗試獲取鎖。

img

RedLock是基於redis實現的分布式鎖,它能夠保證以下特性:

  • 互斥性:在任何時候,只能有一個客戶端能夠持有鎖;避免死鎖:

  • 當客戶端拿到鎖后,即使發生了網絡分區或者客戶端宕機,也不會發生死鎖;(利用key的存活時間)

  • 容錯性:只要多數節點的redis實例正常運行,就能夠對外提供服務,加鎖或者釋放鎖;

以sentinel模式架構為例,如下圖所示,有sentinel-1,sentinel-2,sentinel-3總計3個sentinel模式集群,如果要獲取分布式鎖,那么需要向這3個sentinel集群通過EVAL命令執行LUA腳本,需要3/2+1=2,即至少2個sentinel集群響應成功,才算成功的以Redlock算法獲取到分布式鎖:

Redisson

高可用的紅鎖會導致性能降低

提前說明,使用redis分布式鎖,是追求高性能, 在cap理論中,追求的是 ap 而不是cp。

所以,如果追求高可用,建議使用 zookeeper分布式鎖。

redis分布式鎖可能導致的數據不一致性,建議使用業務補償的方式去彌補。所以,不太建議使用紅鎖,但是從學習的層面來說,大家還是一定要掌握的。

實現原理

Redisson中有一個MultiLock的概念,可以將多個鎖合並為一個大鎖,對一個大鎖進行統一的申請加鎖以及釋放鎖

而Redisson中實現RedLock就是基於MultiLock 去做的,接下來就具體看看對應的實現吧

RedLock使用案例

先看下官方的代碼使用:
(https://github.com/redisson/redisson/wiki/8.-distributed-locks-and-synchronizers#84-redlock)

RLock lock1 = redisson1.getLock("lock1");
RLock lock2 = redisson2.getLock("lock2");
RLock lock3 = redisson3.getLock("lock3");

RLock redLock = anyRedisson.getRedLock(lock1, lock2, lock3);

// traditional lock method
redLock.lock();

// or acquire lock and automatically unlock it after 10 seconds
redLock.lock(10, TimeUnit.SECONDS);

// or wait for lock aquisition up to 100 seconds 
// and automatically unlock it after 10 seconds
boolean res = redLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
   try {
     ...
   } finally {
       redLock.unlock();
   }
}

這里是分別對3個redis實例加鎖,然后獲取一個最后的加鎖結果。

RedissonRedLock實現原理

上面示例中使用redLock.lock()或者tryLock()最終都是執行RedissonRedLock中方法。

RedissonRedLock 繼承自RedissonMultiLock, 實現了其中的一些方法:

public class RedissonRedLock extends RedissonMultiLock {
    public RedissonRedLock(RLock... locks) {
        super(locks);
    }

    /**
     * 鎖可以失敗的次數,鎖的數量-鎖成功客戶端最小的數量
     */
    @Override
    protected int failedLocksLimit() {
        return locks.size() - minLocksAmount(locks);
    }
    
    /**
     * 鎖的數量 / 2 + 1,例如有3個客戶端加鎖,那么最少需要2個客戶端加鎖成功
     */
    protected int minLocksAmount(final List<RLock> locks) {
        return locks.size()/2 + 1;
    }

    /** 
     * 計算多個客戶端一起加鎖的超時時間,每個客戶端的等待時間
     * remainTime默認為4.5s
     */
    @Override
    protected long calcLockWaitTime(long remainTime) {
        return Math.max(remainTime / locks.size(), 1);
    }
    
    @Override
    public void unlock() {
        unlockInner(locks);
    }

}

看到locks.size()/2 + 1 ,例如我們有3個客戶端實例,那么最少2個實例加鎖成功才算分布式鎖加鎖成功。

接着我們看下lock()的具體實現

RedissonMultiLock實現原理


public class RedissonMultiLock implements Lock {

    final List<RLock> locks = new ArrayList<RLock>();

    public RedissonMultiLock(RLock... locks) {
        if (locks.length == 0) {
            throw new IllegalArgumentException("Lock objects are not defined");
        }
        this.locks.addAll(Arrays.asList(locks));
    }

    public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long newLeaseTime = -1;
        if (leaseTime != -1) {
            // 如果等待時間設置了,那么將等待時間 * 2
            newLeaseTime = unit.toMillis(waitTime)*2;
        }
        
        // time為當前時間戳
        long time = System.currentTimeMillis();
        long remainTime = -1;
        if (waitTime != -1) {
            remainTime = unit.toMillis(waitTime);
        }
        // 計算鎖的等待時間,RedLock中:如果remainTime=-1,那么lockWaitTime為1
        long lockWaitTime = calcLockWaitTime(remainTime);
        
        // RedLock中failedLocksLimit即為n/2 + 1
        int failedLocksLimit = failedLocksLimit();
        List<RLock> acquiredLocks = new ArrayList<RLock>(locks.size());
        // 循環每個redis客戶端,去獲取鎖
        for (ListIterator<RLock> iterator = locks.listIterator(); iterator.hasNext();) {
            RLock lock = iterator.next();
            boolean lockAcquired;
            try {
                // 調用tryLock方法去獲取鎖,如果獲取鎖成功,則lockAcquired=true
                if (waitTime == -1 && leaseTime == -1) {
                    lockAcquired = lock.tryLock();
                } else {
                    long awaitTime = Math.min(lockWaitTime, remainTime);
                    lockAcquired = lock.tryLock(awaitTime, newLeaseTime, TimeUnit.MILLISECONDS);
                }
            } catch (Exception e) {
                lockAcquired = false;
            }
            
            // 如果獲取鎖成功,將鎖加入到list集合中
            if (lockAcquired) {
                acquiredLocks.add(lock);
            } else {
                // 如果獲取鎖失敗,判斷失敗次數是否等於失敗的限制次數
                // 比如,3個redis客戶端,最多只能失敗1次
                // 這里locks.size = 3, 3-x=1,說明只要成功了2次就可以直接break掉循環
                if (locks.size() - acquiredLocks.size() == failedLocksLimit()) {
                    break;
                }

                // 如果最大失敗次數等於0
                if (failedLocksLimit == 0) {
                    // 釋放所有的鎖,RedLock加鎖失敗
                    unlockInner(acquiredLocks);
                    if (waitTime == -1 && leaseTime == -1) {
                        return false;
                    }
                    failedLocksLimit = failedLocksLimit();
                    acquiredLocks.clear();
                    // 重置迭代器 重試再次獲取鎖
                    while (iterator.hasPrevious()) {
                        iterator.previous();
                    }
                } else {
                    // 失敗的限制次數減一
                    // 比如3個redis實例,最大的限制次數是1,如果遍歷第一個redis實例,失敗了,那么failedLocksLimit會減成0
                    // 如果failedLocksLimit就會走上面的if邏輯,釋放所有的鎖,然后返回false
                    failedLocksLimit--;
                }
            }
            
            if (remainTime != -1) {
                remainTime -= (System.currentTimeMillis() - time);
                time = System.currentTimeMillis();
                if (remainTime <= 0) {
                    unlockInner(acquiredLocks);
                    return false;
                }
            }
        }

        if (leaseTime != -1) {
            List<RFuture<Boolean>> futures = new ArrayList<RFuture<Boolean>>(acquiredLocks.size());
            for (RLock rLock : acquiredLocks) {
                RFuture<Boolean> future = rLock.expireAsync(unit.toMillis(leaseTime), TimeUnit.MILLISECONDS);
                futures.add(future);
            }
            
            for (RFuture<Boolean> rFuture : futures) {
                rFuture.syncUninterruptibly();
            }
        }
        
        return true;
    }
}

核心代碼都已經加了注釋,實現原理其實很簡單,基於RedLock思想,遍歷所有的Redis客戶端,然后依次加鎖,最后統計成功的次數來判斷是否加鎖成功。

說明:本文會以pdf格式持續更新,更多最新尼恩3高pdf筆記,請從下面的鏈接獲取:語雀 或者 碼雲

Redis分段鎖

普通Redis分布式鎖的性能瓶頸問題

分布式鎖一旦加了之后,對同一個商品的下單請求,會導致所有下單操作,都必須對同一個商品key加分布式鎖。

假設某個場景,一個商品1分鍾6000訂單,每秒的 600個下單操作,

假設加鎖之后,釋放鎖之前,查庫存 -> 創建訂單 -> 扣減庫存,每個IO操作100ms,大概300毫秒。

具體如下圖:

在這里插入圖片描述

可以再進行一下優化,將 創建訂單 + 扣減庫存 並發執行,將兩個100ms 減少為一個100ms,這既是空間換時間的思想,大概200毫秒。

在這里插入圖片描述

將 創建訂單 + 扣減庫存 批量執行,減少一次IO,也是大概200毫秒。

這個優化方案,有個重要的前提,就是 訂單表和庫存表在相同的庫中,但是,這個前提條件,在數據量大+高並發的場景下,夠嗆。

package com.crazymaker.springcloud;

import com.crazymaker.springcloud.common.util.ThreadUtil;
import org.junit.Test;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;

public class CoCurrentDemo {


    /**
     * 使用CompletableFuture 和  CountDownLatch  進行並發回調
     */

    @Test
    public void testMutiCallBack() {
        CountDownLatch countDownLatch = new CountDownLatch(10);
        //批量異步
        ExecutorService executor = ThreadUtil.getIoIntenseTargetThreadPool();
        long start = System.currentTimeMillis();
        for (int i = 0; i < 10; i++) {
            CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> {
                long tid = ThreadUtil.getCurThreadId();
                 try {
                    System.out.println("線程" + tid + "開始了,模擬一下遠程調用");
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return tid;
            }, executor);

            future.thenAccept((tid) -> {
                System.out.println("線程" + tid + "結束了");
                countDownLatch.countDown();
            });
        }
        try {
            countDownLatch.await();
            //輸出統計結果
            float time = System.currentTimeMillis() - start;

            System.out.println("所有任務已經執行完畢");
            System.out.println("運行的時長為(ms):" + time);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

那么,一秒內,只能完成多少個商品的秒殺訂單的下單操作呢?

1000毫秒 / 200 =5 個訂單

如何達到每秒600個下單呢? 還是要從基礎知識里邊尋找答案?

分段加鎖的思想來源

分段加鎖的思想來源與基礎知識。

我經常在瘋狂創客圈社群里邊,對小伙伴們強調 基礎知識的重要性,反復強調, 《Java 高並發三部曲》 一定要多刷,最好刷三遍。

《Java 高並發核心編程 卷2》 介紹了 JUC的 LongAdder 和 ConcurrentHashMap的源碼和底層原理,他們提升性能的辦法是:

空間換時間, 分段加鎖

尤其是 LongAdder 的實現思想,可以用於 Redis分布式鎖 作為性能提升的手段,將 Redis分布式鎖 優化為 Redis分段鎖。

有關LongAdder 的系統化學習

有關LongAdder 的系統化學習,請參見 《Java 高並發核心編程 卷2》

在這里插入圖片描述

使用Redis分段鎖提升秒殺的並發性能

回到前面的場景:

假設一個商品1分鍾6000訂單,每秒的 600個下單操作,

假設加鎖之后,釋放鎖之前,查庫存 -> 創建訂單 -> 扣減庫存,經過優化,每個IO操作100ms,大概200毫秒,一秒鍾5個訂單。

如何提高性能呢? 空間換時間

為了達到每秒600個訂單,可以將鎖分成 600 /5 =120 個段,反過來, 每個段1秒可以操作5次, 120個段,合起來,及時每秒操作600次。

進行搶奪鎖的,如果申請到一個具體的段呢?

每一次使用隨機算法,隨機到一個分段, 如果不行,就輪詢下一個分段,具體的流程,大致如下:

在這里插入圖片描述

缺點:

這個是一個理論的時間預估,沒有扣除 嘗試下一個分段的 時間, 另外,實際上的性能, 會比理論上差,從咱們實操案例的測試結果,也可以證明這點。

實戰: 手寫一個Redis分段鎖

package com.crazymaker.springcloud.standard.lock;

import com.crazymaker.springcloud.common.util.RandomUtil;
import com.crazymaker.springcloud.common.util.ThreadUtil;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

@Slf4j
@Data
@AllArgsConstructor
public class JedisMultiSegmentLock implements Lock {

    public static final int NO_SEG = -1;
    //拿到鎖的線程
    private Thread thread;

    //拿到鎖的狀態
    private volatile boolean isLocked = false;

    //段數
    private final int segAmount;

    public static final int DEFAULT_TIMEOUT = 2000;
    public static final Long WAIT_GAT = Long.valueOf(100);

    //內部的鎖
    InnerLock[] innerLocks = null;

    //被鎖住的分段
    int segmentIndexLocked = NO_SEG;
    /**
     * 默認為2000ms
     */
    long expire = 2000L;
    int segmentIndex = 0;

    public JedisMultiSegmentLock(String lockKey, String requestId, int segAmount) {
        this.segAmount = segAmount;
        innerLocks = new InnerLock[segAmount];
        for (int i = 0; i < this.segAmount; i++) {
            //每一個分段,加上一個編號
            String innerLockKey = lockKey + ":" + i;
            innerLocks[i] = new InnerLock(innerLockKey, requestId);
        }
        segmentIndex = RandomUtil.randInModLower(this.segAmount);
    }


    /**
     * 獲取一個分布式鎖 , 超時則返回失敗
     *
     * @return 獲鎖成功 - true | 獲鎖失敗 - false
     */
    @Override
    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {

        //本地可重入
        if (isLocked && thread == Thread.currentThread()) {
            return true;
        }
        expire = unit != null ? unit.toMillis(time) : DEFAULT_TIMEOUT;
        long startMillis = System.currentTimeMillis();
        Long millisToWait = expire;

        boolean localLocked = false;

        int turn = 1;

        InnerLock innerLock = innerLocks[segmentIndex];

        while (!localLocked) {

            localLocked = innerLock.lock(expire);
            if (!localLocked) {
                millisToWait = millisToWait - (System.currentTimeMillis() - startMillis);
                startMillis = System.currentTimeMillis();
                if (millisToWait > 0L) {
                    /**
                     * 還沒有超時
                     */
                    ThreadUtil.sleepMilliSeconds(WAIT_GAT);
                    log.info("睡眠一下,重新開始,turn:{},剩余時間:{}", turn++, millisToWait);

                    segmentIndex++;
                    if (segmentIndex >= this.segAmount) {
                        segmentIndex = 0;
                    }
                    innerLock = innerLocks[segmentIndex];
                } else {
                    log.info("搶鎖超時");
                    return false;
                }
            } else {
                segmentIndexLocked = segmentIndex;
                isLocked = true;
                localLocked = true;
                thread = Thread.currentThread();
            }
        }
        return isLocked;
    }


    /**
     * 搶奪鎖
     */
    @Override
    public void lock() {
        throw new IllegalStateException(
                "方法 'lock' 尚未實現!");
    }


    //釋放鎖
    @Override
    public void unlock() {
        if (segmentIndexLocked == NO_SEG) {
            return;
        }
        this.innerLocks[segmentIndexLocked].unlock();

        segmentIndexLocked = NO_SEG;
        thread = null;
        isLocked = false;
    }

    @Override
    public Condition newCondition() {
        throw new IllegalStateException(
                "方法 'newCondition' 尚未實現!");
    }

    @Override
    public void lockInterruptibly() throws InterruptedException {
        throw new IllegalStateException(
                "方法 'lockInterruptibly' 尚未實現!");

    }


    @Override
    public boolean tryLock() {
        throw new IllegalStateException(
                "方法 'tryLock' 尚未實現!");
    }

}

說明:本文會以pdf格式持續更新,更多最新尼恩3高pdf筆記,請從下面的鏈接獲取:語雀 或者 碼雲

尼恩的忠實建議:

強烈參照 LongAdder ,手寫一個Redis分段鎖,

這里, 還是有點復雜,但是很重要,建議大家動手干一票.

  • 理論水平的提升,看看視頻、看看書,只有兩個字,就是需要:多看。
  • 實戰水平的提升,只有兩個字,就是需要:多干。

手寫一個Redis分段鎖的實操,是高並發實戰的重要動手實操之一。

有關Redis分段鎖的實操的具體材料、源碼、問題,歡迎來 瘋狂創客圈社群交流。

高並發Java發燒友社群 - 瘋狂創客圈 總入口 點擊了解詳情

文章核心內容和源碼來源

圖書:《Netty Zookeeper Redis 高並發實戰》 圖書簡介 - 瘋狂創...

參考文檔:

圖書:《Netty Zookeeper Redis 高並發實戰》 圖書簡介 - 瘋狂創...
Distributed locks with Redis
how-to-do-distributed-locking
redisson watchdog 使用和原理
zookeeper實現分布式鎖_java_腳本之家
基於Zookeeper分布式鎖實現 - SegmentFault 思否
分布式鎖用 Redis 還是 Zookeeper - 知乎
ZooKeeper分布式鎖的實現原理 - 菜鳥奮斗史 - 博客園
https://blog.csdn.net/men_wen/article/details/72853078


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM