Redis的分布式鎖詳解


Redis實現的分布式鎖

  1. # 對資源key加鎖,key不存在時創建,並且設置, 10秒自動過期
  2. SET key value EX  10 NX
  3. # 刪除key
  4. DEL key

NX的作用

NX參數是為了保證當分布式鎖不存在時,只有一個client能寫入次key成功,獲取到鎖。

分布式鎖的第一核心要素就是互斥性、安全性,在同一時間內,不允許多個client同時獲得鎖

未設置key的自動過期時間

分布式鎖的第二個核心要素,活性。在實現分布式鎖的過程中要考慮到client可能會出現crash或者網絡分區,需要原子申請分布式鎖以及設置鎖的自動過期時間,
通過過期、超時等機智自動釋放鎖,避免死鎖,導致業務中斷。

設置過期時間,仍然出現超賣

Redis分布式鎖的是實現是阻塞的請求執行完成后,不能保證原子性操作,可以通過lua腳本來實現redis比較庫存、扣件庫存操作的原子性。

為什么喜歡用redis做分布式鎖

Redis的核心優點是快、簡單、部署方便。

Redis分布式鎖的問題

Redis分布式鎖存在的問題:

  • 1.存在單點故障問題,官方給出了 解決的方法,就是RedLock算法。

  • 2.獲取鎖失敗后,只能拋出異常,不能阻塞線程。Redisson 開源框架解決了這些問題。

分布式鎖創建的方案

Zookeeper是一個典型的分布式元數據存儲服務,它的分布式鎖實現基於Zookeeper的臨時節點和順序特性。

臨時節點具備數據自動刪除的功能,當client和Zookeeper連接和session斷掉時,相應的臨時節點就會被刪除。

Zookeeper也提供了Watch特性可監聽key的數據變化。

etcd分布式鎖實現

事務與鎖的安全性

etcd的事務特性有IF語句、ELSE語句、THEN語句組成,IF語句支持比較key的是修改版本號mod_version和創建版本號create_version.

可以通過key的創建版本號create_version來檢查key是否存在,如果不存在,create_revision的版本號是0。

Lease與鎖的活性

Lease是一種活性檢測機制,提供了檢測各個客戶端存活的能力。
通過Lease機制就可以優雅地解決了client出現crash故障、client與etcd集群網絡出現隔離等各類故障場景下的死鎖問題,超過Lease TTL,就會自動釋放。

Watch與鎖的可用性

watch提供高效的數據監聽能力,當client收到watch delete事件后,就可以快速判斷自己是否有資格獲取鎖,極大減少了鎖的不可用時間。

Jedis與Redisson選型對比

1  概述

1.1.       主要內容

本文的主要內容為對比Redis的兩個框架:Jedis與Redisson,分析各自的優勢與缺點,為項目中Java緩存方案中的Redis編程模型的選擇提供參考。

2.    JedisRedisson對比

2.1.    概況對比

Jedis是Redis的Java實現的客戶端,其API提供了比較全面的Redis命令的支持;Redisson實現了分布式和可擴展的Java數據結構,和Jedis相比,功能較為簡單,不支持字符串操作,不支持排序、事務、管道、分區等Redis特性。Redisson的宗旨是促進使用者對Redis的關注分離,從而讓使用者能夠將精力更集中地放在處理業務邏輯上。

2.2.    編程模型

Jedis中的方法調用是比較底層的暴露的Redis的API,也即Jedis中的Java方法基本和Redis的API保持着一致,了解Redis的API,也就能熟練的使用Jedis。而Redisson中的方法則是進行比較高的抽象,每個方法調用可能進行了一個或多個Redis方法調用。

如下分別為Jedis和Redisson操作的簡單示例:

Jedis設置key-value與set操作:

Jedis jedis = …;

jedis.set("key", "value");

List<String> values = jedis.mget("key", "key2", "key3");

Redisson操作map:

Redisson redisson = …

RMap map = redisson.getMap("my-map"); // implement java.util.Map

map.put("key", "value");

map.containsKey("key");

map.get("key");

2.3.    可伸縮性

Jedis使用阻塞的I/O,且其方法調用都是同步的,程序流需要等到sockets處理完I/O才能執行,不支持異步。Jedis客戶端實例不是線程安全的,所以需要通過連接池來使用Jedis。

Redisson使用非阻塞的I/O和基於Netty框架的事件驅動的通信層,其方法調用是異步的。Redisson的API是線程安全的,所以可以操作單個Redisson連接來完成各種操作。

2.4.    數據結構

Jedis僅支持基本的數據類型如:String、Hash、List、Set、Sorted Set。

Redisson不僅提供了一系列的分布式Java常用對象,基本可以與Java的基本數據結構通用,還提供了許多分布式服務,其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service)。

在分布式開發中,Redisson可提供更便捷的方法。

2.5.    第三方框架整合

1       Redisson提供了和Spring框架的各項特性類似的,以Spring XML的命名空間的方式配置RedissonClient實例和它所支持的所有對象和服務;

2       Redisson完整的實現了Spring框架里的緩存機制;

3       Redisson在Redis的基礎上實現了Java緩存標准規范;

4       Redisson為Apache Tomcat集群提供了基於Redis的非黏性會話管理功能。該功能支持Apache Tomcat的6、7和8版。

5  Redisson還提供了Spring Session會話管理器的實現。

 

Redisson幾種鎖說明

1. 可重入鎖(Reentrant Lock)

Redisson的分布式可重入鎖RLock Java對象實現了java.util.concurrent.locks.Lock接口,同時還支持自動過期解鎖。

    1. public void testReentrantLock(RedissonClient redisson){ RLock lock = redisson.getLock("anyLock"); try{ // 1. 最常見的使用方法 //lock.lock(); // 2. 支持過期解鎖功能,10秒鍾以后自動解鎖, 無需調用unlock方法手動解鎖 //lock.lock(10, TimeUnit.SECONDS); // 3. 嘗試加鎖,最多等待3秒,上鎖以后10秒自動解鎖 boolean res = lock.tryLock(3, 10, TimeUnit.SECONDS); if(res){ //成功 // do your business } } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }

Redisson同時還為分布式鎖提供了異步執行的相關方法:

    1. public void testAsyncReentrantLock(RedissonClient redisson){ RLock lock = redisson.getLock("anyLock"); try{ lock.lockAsync(); lock.lockAsync(10, TimeUnit.SECONDS); Future res = lock.tryLockAsync(3, 10, TimeUnit.SECONDS); if(res.get()){ // do your business } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } finally { lock.unlock(); } }

2. 公平鎖(Fair Lock)

Redisson分布式可重入公平鎖也是實現了java.util.concurrent.locks.Lock接口的一種RLock對象。在提供了自動過期解鎖功能的同時,保證了當多個Redisson客戶端線程同時請求加鎖時,優先分配給先發出請求的線程。

    1. public void testFairLock(RedissonClient redisson){ RLock fairLock = redisson.getFairLock("anyLock"); try{ // 最常見的使用方法 fairLock.lock(); // 支持過期解鎖功能, 10秒鍾以后自動解鎖,無需調用unlock方法手動解鎖 fairLock.lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖 boolean res = fairLock.tryLock(100, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { fairLock.unlock(); } }

Redisson同時還為分布式可重入公平鎖提供了異步執行的相關方法:

    1. RLock fairLock = redisson.getFairLock("anyLock"); fairLock.lockAsync(); fairLock.lockAsync(10, TimeUnit.SECONDS); Future res = fairLock.tryLockAsync(100, 10, TimeUnit.SECONDS);

3. 聯鎖(MultiLock)

Redisson的RedissonMultiLock對象可以將多個RLock對象關聯為一個聯鎖,每個RLock對象實例可以來自於不同的Redisson實例。

    1. public void testMultiLock(RedissonClient redisson1, RedissonClient redisson2, RedissonClient redisson3){ RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RedissonMultiLock lock = new RedissonMultiLock(lock1, lock2, lock3); try { // 同時加鎖:lock1 lock2 lock3, 所有的鎖都上鎖成功才算成功。 lock.lock(); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }

4. 紅鎖(RedLock)

Redisson的RedissonRedLock對象實現了Redlock介紹的加鎖算法。該對象也可以用來將多個RLock

對象關聯為一個紅鎖,每個RLock對象實例可以來自於不同的Redisson實例。

    1. public void testRedLock(RedissonClient redisson1, RedissonClient redisson2, RedissonClient redisson3){ RLock lock1 = redisson1.getLock("lock1"); RLock lock2 = redisson2.getLock("lock2"); RLock lock3 = redisson3.getLock("lock3"); RedissonRedLock lock = new RedissonRedLock(lock1, lock2, lock3); try { // 同時加鎖:lock1 lock2 lock3, 紅鎖在大部分節點上加鎖成功就算成功。 lock.lock(); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖 boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } }

5. 讀寫鎖(ReadWriteLock)

Redisson的分布式可重入讀寫鎖RReadWriteLock Java對象實現了java.util.concurrent.locks.ReadWriteLock接口。同時還支持自動過期解鎖。該對象允許同時有多個讀取鎖,但是最多只能有一個寫入鎖。

    1. RReadWriteLock rwlock = redisson.getLock("anyRWLock"); // 最常見的使用方法 rwlock.readLock().lock(); // 或 rwlock.writeLock().lock(); // 支持過期解鎖功能 // 10秒鍾以后自動解鎖 // 無需調用unlock方法手動解鎖 rwlock.readLock().lock(10, TimeUnit.SECONDS); // 或 rwlock.writeLock().lock(10, TimeUnit.SECONDS); // 嘗試加鎖,最多等待100秒,上鎖以后10秒自動解鎖 boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS); // 或 boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS); ... lock.unlock();

6. 信號量(Semaphore)

Redisson的分布式信號量(Semaphore)Java對象RSemaphore采用了與java.util.concurrent.Semaphore相似的接口和用法。

    1. RSemaphore semaphore = redisson.getSemaphore("semaphore"); semaphore.acquire(); //或 semaphore.acquireAsync(); semaphore.acquire(23); semaphore.tryAcquire(); //或 semaphore.tryAcquireAsync(); semaphore.tryAcquire(23, TimeUnit.SECONDS); //或 semaphore.tryAcquireAsync(23, TimeUnit.SECONDS); semaphore.release(10); semaphore.release(); //或 semaphore.releaseAsync();

7. 可過期性信號量(PermitExpirableSemaphore)

Redisson的可過期性信號量(PermitExpirableSemaphore)實在RSemaphore對象的基礎上,為每個信號增加了一個過期時間。每個信號可以通過獨立的ID來辨識,釋放時只能通過提交這個ID才能釋放。

    1. RPermitExpirableSemaphore semaphore = redisson.getPermitExpirableSemaphore("mySemaphore"); String permitId = semaphore.acquire(); // 獲取一個信號,有效期只有2秒鍾。 String permitId = semaphore.acquire(2, TimeUnit.SECONDS); // ... semaphore.release(permitId);

8. 閉鎖(CountDownLatch)

Redisson的分布式閉鎖(CountDownLatch)Java對象RCountDownLatch采用了與java.util.concurrent.CountDownLatch相似的接口和用法。

分布式鎖的實現方式也有很多種,基於zookeeper的,基於分布式數據庫的以及本文我們要介紹的基於Redis的。為了系統而全面的介紹基於Redis的實現方案,本文主要涉及如下內容:

    1. Redis單節點鎖的實現
    2. Redis集群鎖的實現(RedLock)
    3. Redis鎖的使用場景分析
    4. 一個RedLock的python實現及需要的改進方案

單節點Redis的鎖實現

單節點Redis的鎖實現主要有三大點: SETNX的使用、超時時間設置以及釋放鎖前先檢查鎖,下面一一介紹。

SETNX

前面我們說過,鎖的實現一定是要基於各個工作流都能看到的公共區域實現。而Redis本身作為一個內存數據庫就天然的適合鎖的實現。就跟擊鼓傳花似的,只要有一個進程拿到了這個花,別的進程就必須得有辦法知道這個花被占用了,這樣他就可以原地等待直到這個花被釋放出來,各個進程再去搶這朵花。Redis的SETNX命令非常適合排他性的鎖實現。該命令只會在鍵不存在的情況下才會為鍵設置值,一個進程申明獨占某個資源的方式就是對一個鍵使用SETNX,這樣當別的進程再去使用這個命令設置這個鍵的時候就會失敗進而無法獲得鎖,這樣滿足了我們的前言中鎖的互斥性原則。這時我們的程序和我們的鎖獲得與釋放可以這么寫。

def Job(): identifier = str(uuid.uuid4()) lock_name = "mylock" try: #一直循環等待獲得鎖 while True: #如果獲得鎖c成功就完成工作並退出循環 if lock(lock_name): do_something() break finally: #釋放鎖 unlock(lock_name) def lock(lock_name): return redis.setnx(lock_name, identifier) def unlock(lock_name): redis.delete(lock_name) 

超時時間設置

但是只用上述方法,避免死鎖的性質能否被滿足呢?假設當前獲得鎖的進程掛掉了,那么由於當前進程沒有釋放鎖,那么其他的進程就會永遠都沒法獲得鎖(因為setnx命令會一直返回False)。所以我們不能只依賴工作進程自己主動去釋放鎖。還需要給鎖加一個生存期間,如果鎖過了生存期間還沒有被釋放,則Redis強制釋放該鎖。這樣,即使獲得鎖的進程掛掉了,其他的進程還是可以在一段時間后獲得鎖而不會現如死鎖的困境。Redis的超時限制特性可以解決這個問題。此時,我們的鎖獲得和釋放可以這么寫:

def lock(lock_name, lock_timeout): if redis.setnx(lock_name, identifier): redis.expire(lock_name,lock_timeout) return True return False def unlock(lock_name): redis.delete(lock_name)

釋放鎖時先檢查

帶有超時特性的鎖滿足了避免死鎖的性質,但是這種auto release的機制的卻很有可能破壞鎖的互斥性質。舉個例子,比如當進程A獲得了鎖,並設置鎖的超期時間為10s,進程A由於處理任務花費的時間較長,10s后任務還沒處理完,但是此時鎖已經過期被釋放了,進程B重新獲得了鎖(不要忘了,鎖實際上是對資源獨占的一種申明)。這個時候由於進程A沒有主動釋放鎖,進程B又獲得了鎖,對A、B來講,他們都認為自己獨占了資源,當他們按照獨占資源的想法去操作資源的時候就可能會導致沖突。同時還存在的另一個問題,A的鎖由於超時被釋放了且B重新獲得了鎖,但是A並不知道自己的鎖已經被釋放了,A做完處理工作之后開始釋放鎖,然而這時釋放的其實是B的鎖(因為都刪除的是mylock鍵)。B吃着火鍋唱着歌,回頭一看,鎖沒了==,很糟糕。對於第一個問題的解決方案我們后續介紹,但是對於第二個誤釋放別人的鎖,我們可以在unlock中使用如下步驟釋放鎖:

  • 從redis中使用GET命令得到mylock的值,並檢查鎖對應的值是不是自己當時存的identifier
  • 如果是,那就使用DELETE釋放,如果不是,說明自己的鎖已經被自動釋放了,則不做任何處理。

為保證上述整個操作的原子性,防止在GET之后,DELETE之前的期間Redis恰巧把鎖給自動釋放了,一般把上述的過程寫到一個Lua的腳本中提交給Redis執行,因為Redis執行Lua腳本中的命令是原子性質的。

代碼如下:

def unlock(lock_nameidentifier): unlock_script = """  if redis.call("get",KEYS[1]) == ARGV[1] then  return redis.call("del",KEYS[1])  else  return 0  end""" redis.eval(unlock_script, 1, lock_name, identifier) 

到這里Redis單節點的鎖實現已經基本介紹完了,主要有三點

  1. 使用SETNX實現排他性鎖
  2. 使用超時限制特性來避免死鎖
  3. 釋放鎖的時候需要進行檢查來避免誤釋放別的進程的鎖

那么我們接下來要討論的一個問題是:如果在使用的過程中Redis突然掛了會怎樣?Redis介紹1中介紹過Redis是有持久化機制的。當Redis掛了之后,再次恢復的時候可以從磁盤上把數據恢復出來。

但是問題在於如果A進程獲得了鎖(即在Redis中使用SETNX設置了鍵),在Redis把這個操作持久化到磁盤之前Redis就掛了的話,那么Redis再次啟動的時候,通過磁盤持久化文件恢復出來的數據中是沒有這個鎖的信息的。當B去嘗試獲得鎖時發現自己可以獲得鎖,但是此時A認為自己還沒有釋放鎖,於是又導致了A、B兩個進程同時認為自己持有鎖的情況發生,破壞了性質1。

上述的問題主要發生本質原因還是在於單點問題,當我們只依賴單個Redis節點時,我們就只能承受單點不可靠帶來的風險。而我們都知道,在分布式系統中,常用的應對單點風險的解決方案就是冗余節點。那我們能不能多啟動幾個Redis,保留鍵的多副本,這樣即使一個Redis因為意外掛掉了,我們也可以使用別的Redis服務器繼續正常服務?答案是YES! 這就是我們接下來要介紹的RedLock。

RedLock

RedLock是Redis之父Salvatore Sanfilippo提出來的基於多個Redis實例的分布式鎖的實現方案。其核心思想就在於使用多個Redis冗余實例來避免單Redis實例的不可靠性。比如我們采用5個Redis實例,我們可以把5個Redis全部部署到同一台機器上,也可以把5個Redis部署在5個不同的機器上。一般為了實現更好的讀寫性能以及抗風險能力,我們選擇部署5個Redis在5個機器上。

基於Redis的鎖的實現本質都是針對數據庫的讀寫操作。那么采用5個Redis節點我們就需要考慮副本讀寫的一致性問題。基於不同的准則,我們有不同的權衡,比如寫入的副本一致性,可以要求到只要一個節點寫入成功則成功,或者依據法團准則,寫入(N/2 +1)個節點后才成功,或者寫入所有的節點后才成功等等。RedLock采用的就是依據法團准則的方案:

多副本情況下的一致性准則

假如我們現在有5台機器上跑着5個Redis服務器,RedLock的獲取步驟如下:

  1. 依次向每個Redis服務器獲取鎖(即使用SETNX設置鍵值), 如果獲得至少(N/2 +1)個服務器的鎖(在本例中就是3),則認為獲得鎖成功
  2. 如果第一步中獲得的鎖的個數少於3個,則認為獲得鎖失敗。為保證其他節點獲得鎖正常,在所有Redis節點上釋放鎖(因為有可能有的節點設置成功了鎖)

釋放鎖的過程也很簡單,就和上述的單節點的鎖釋放步驟一致,只不過改成了在所有的節點上都執行一遍鎖釋放。

那我們先來看看這樣是否能夠解決如果一個Redis節點掛了出現鎖被同時兩個客戶端獲得的情況。假設我們總共有5台機器,客戶端A從R1,R2,R3上獲得了鎖,但是在R1未來得及把這個操作持久化到磁盤上時,R1掛掉了。此時R1重啟之后,其從磁盤上恢復的數據並沒有A的鎖的信息,所以進程B可以從R1,R4,R5再次獲得鎖(滿足法團協議),這樣就又造成了沖突。為解決這個問題,Redis作者提出了延遲重啟的解決方案。

延遲重啟

假設R1掛掉了之后,我們不再讓R1提供服務會怎么樣呢?首先可以保證的時,上述的A、B同時獲得鎖的情況不會發生,因為B最多從R4,R5獲得兩個鎖,不滿足法團協議。但是顯然我們不能讓R1永遠的不提供服務。但我們可以讓他等一會再重新對外提供服務,那得等待多久呢?我們可以發現的是,受R1掛了然后接着重啟這件事影響的鎖只是在R1掛的那個時刻R1上存的所有鎖,之后創建的新鎖或者沒在R1上存儲的鎖都不受R1掛了這件事的影響。而我們前面又知道,在設置鎖的時候,為避免陷入死鎖的困境,我們給每個鎖設置了一個過期時間。那R1只需要等到R1掛掉的那個時刻其上面所有的鍵都過期之后再對外提供服務即可,即可以等待一個所有鍵的MAX TTL即可。但是這個MAX TTL我們是沒法只通過統計R1上的鍵准確的知道的,因為R1有一部分鍵的信息由於沒有持久化到磁盤上已經丟失了。但是為了保險,我們可以通過統計當前時刻所有機器上的MAX TTL,然后取所有機器的MAX TTL即可。這樣我們就可以保證R1加入服務后,其上所有的鎖都肯定已經失效了。有了延遲重啟和多Redis實例的解決方案,我們對Redis節點可能會掛這個風險有了更強的的抵抗能力。

但是軟件行業里顯然沒有任何銀彈方案。引入了副本在提升魯棒性的同時也對整個系統引入了復雜性和不確定性。我們來看這樣一個例子:我們有5個Redis服務器,客戶端A試圖獲得一個超時期限為10s的鎖。按照上述的流程,我們是從R1到R5依次嘗試獲得鎖mylock,當前時間戳假設是12300

  1. 我們先從R1獲得了鎖,此時R1機器上記錄的mylock的到期時間戳為12310
  2. 我們再嘗試從R2獲得鎖,由於網絡的問題,等R2獲得請求時,時間已經到了12302了,那么R2機器上記錄的mylock的到期時間戳即為12312
  3. 同理,當我們再次嘗試從R3獲得鎖時,網絡暢通,當前時間戳仍然是12302,R3上記錄的mylock的到期時間戳為12312

發現了么,R1, R2,R3的到期時間戳是不一樣的。如果我們按照三個機器的最大時間戳來當作mylock的過期時間戳會導致如果客戶端B在時間戳為12311時嘗試獲得mylock鎖,由於R1中mylock已經過期,則B從R1,R4,R5獲得鎖,滿足法團協議,獲得獲得鎖成功,此時出現A、B同時得到鎖。所以顯然不能使用最大時間戳來當作過期時間戳,使用理論時間戳(即開始設置時,本地機器的時間戳+TTL)是最保險的方案,因為他肯定是最小的。為了避免在獲取鎖的過程中因為網絡的問題占用了過多的鎖可使用時間,每次從一個機器獲取鎖的時候都在網絡上只等一個非常小的時間,超時還未獲得鎖就立馬嘗試下一個節點。

到這似乎問題都被解決了,那是不是RedLock就真的完美了么?顯然不是

RedLock的問題

RedLock可能的最大問題在於對各個機器時間流速的一致性假設。什么叫時間流速一致性假設呢?就是機器A上過了一分鍾,機器B上也過了幾乎一分鍾。讀者看到這會想,這不是廢話么,難道還有流速不一致的情況。其問題在於,一個機器的時間是有可能跳變的。比如管理員重新校正機器時間,或者機器的時鍾模塊收到外部更新信號,重新校對時間等。這就有可能導致如下的情況出現:客戶端A從R1,R2,R3獲得了鎖,並設置了過期時間為10s,但是在其中的某個時刻,可能R1的時間被重新校正,“快進了10s”,調整完時間之后,R1上的鎖就已經過期了。此時B再次申請同樣的鎖,則可以從R1,R4,R5獲得鎖,滿足法團協議。獲得鎖成功。當然,這種問題發生的概率可能是足夠低的,能不能承受這樣的情況帶來的損失決定着是否采用Redis來實現分布式鎖。

第二個問題出現在auto release的機制上。auto release的使用解決了持有鎖的進程不能正常釋放鎖導致的死鎖問題,但是同時帶來的問題可能就是如下這種情況,即客戶端1還沒使用完鎖但是鎖已經過期了,這時客戶端2獲得了鎖,結果客戶端1、客戶端2都對資源進行了使用。當然,設置過期時間實際上是租約機制的一種,但是RedLock的算法中沒有提到續租的相關機制。后來開源的實現中,Java的redisson實現了基於watchdog的續租機制,就比較好的緩解了該問題。

那RedLock有這樣的問題的話,我們是否該用RedLock呢?我覺得回答這個問題我們首先需要明確我們為什么需要使用鎖。這個問題我覺得Martin的博客中總結的非常好,所以直接拿來用。即用鎖主要有兩個目的,第一便是為了效率着想,比如我們不想讓一個耗時的任務被重復的執行,第二個目的便是為了程序的正確性考慮。比如12306的訂票問題,如果不使用鎖,很容易出現剩余一張票,但是10個人都在網站上搶到了這張票的情況。

  • Efficiency: Taking a lock saves you from unnecessarily doing the same work twice (e.g. some expensive computation). If the lock fails and two nodes end up doing the same piece of work, the result is a minor increase in cost (you end up paying 5 cents more to AWS than you otherwise would have) or a minor inconvenience (e.g. a user ends up getting the same email notification twice).
  • Correctness: Taking a lock prevents concurrent processes from stepping on each others’ toes and messing up the state of your system. If the lock fails and two nodes concurrently work on the same piece of data, the result is a corrupted file, data loss, permanent inconsistency, the wrong dose of a drug administered to a patient, or some other serious problem.

當我們基於Efficiency的目標的時候,基於Redis的的分布式鎖是很好的實現方式,但是實際上如果只是為了Efficiency考慮的話,RedLock的使用就沒有必要了,基於單節點的Redis分布式鎖就完全能夠滿足需要。如果程序的正確性嚴格的依賴於鎖的使用的話,那么就看用戶是否能夠承受可能的時間不一致性帶來的風險,如果能,那么就可以使用RedLock,如果不能那么就不要使用RedLock。那么使用什么呢?雖然我沒用過,但是貌似聽說常用的解決方案就是基於zookeeper的分布式鎖。

RedLock的分布式實現

俗話說得好,Talk is cheap, show me the code。RedLock的思想本身不復雜,所以實現也非常的簡單。這里給出的RedLock的python實現是Redis官網列出來的實現,應該是比較靠譜的。源代碼很簡單,就放個git鏈接給大家自己去研究吧。對着上面的RedLock和單機版的鎖實現,這段代碼大家就秒懂了。

但我個人認為這個python的實現有個需要完善的地方在於沒有watchdog的機制來實現續租,包括這個開源代碼中的issue中也提到了這個問題。java的redisson實現了watchdog的feature,但是這個里面沒有。我覺得基於watchdog的續租需要滿足如下的特性:

  1. 如果程序正常運行,但是鎖的到期時間快到了,那么就應該續租
  2. 如果程序終止,則停止續租
  3. 如果程序使用鎖期間,陷入死循環,則停止續租。

就目前而言,我覺得在python里面實現鎖的續租機制的話,基於多線程應該是個比較好的解決方案之一。具體的研究和實現可以在下一次的博客進行分享~

總結

本篇介紹了基於單節點Redis服務器的分布式鎖實現以及為了應對單點風險而基於多節點的分布式鎖RedLock實現。無論是單節點鎖還是多節點鎖,整體的思想還是比較簡單的。如果要用基於Redis的鎖,我們一定要先衡量我們的場景對鎖的要求是否足夠嚴格,如果有非常嚴格的正確性要求,那么就可能要三思一下是否使用基於Redis的分布式鎖了。

 

redis分布式鎖就幾個方法

1、setnx(key,value) 返回boolean 1為獲取鎖 0為沒獲取鎖

2、expire() 設置鎖的有效時間

3、getSet(key,value) 獲取鎖當前key對應的鎖的有效時間

4、deleteKey() 刪除鎖

setnx(lockkey, 當前時間+過期超時時間),如果返回 1,則獲取鎖成功;如果返回 0 則沒有獲取到鎖,轉向 2。
get(lockkey) 獲取值 oldExpireTime ,並將這個 value 值與當前的系統時間進行比較,如果小於當前系統時間,則認為這個鎖已經超時,可以允許別的請求重新獲取,轉向 3。
計算 newExpireTime = 當前時間+過期超時時間,然后 getset(lockkey, newExpireTime) 會返回當前 lockkey 的值currentExpireTime。
判斷 currentExpireTime 與 oldExpireTime 是否相等,如果相等,說明當前 getset 設置成功,獲取到了鎖。如果不相等,說明這個鎖又被別的請求獲取走了,那么當前請求可以直接返回失敗,或者繼續重試。
在獲取到鎖之后,當前線程可以開始自己的業務處理,當處理完畢后,比較自己的處理時間和對於鎖設置的超時時間,如果小於鎖設置的超時時間,則直接執行 delete 釋放鎖;如果大於鎖設置的超時時間,則不需要再鎖進行處理。


過程分析:

當A通過setnx(lockkey,currenttime+timeout)命令能成功設置lockkey時,即返回值為1,過程與原理1一致;
當A通過setnx(lockkey,currenttime+timeout)命令不能成功設置lockkey時,這是不能直接斷定獲取鎖失敗;因為我們在設置鎖時,設置了鎖的超時時間timeout,當當前時間大於redis中存儲鍵值為lockkey的value值時,可以認為上一任的擁有者對鎖的使用權已經失效了,A就可以強行擁有該鎖;具體判定過程如下;
A通過get(lockkey),獲取redis中的存儲鍵值為lockkey的value值,即獲取鎖的相對時間lockvalueA
lockvalueA!=null && currenttime>lockvalue,A通過當前的時間與鎖設置的時間做比較,如果當前時間已經大於鎖設置的時間臨界,即可以進一步判斷是否可以獲取鎖,否則說明該鎖還在被占用,A就還不能獲取該鎖,結束,獲取鎖失敗;
步驟4返回結果為true后,通過getSet設置新的超時時間,並返回舊值lockvalueB,以作判斷,因為在分布式環境,在進入這里時可能另外的進程獲取到鎖並對值進行了修改,只有舊值與返回的值一致才能說明中間未被其他進程獲取到這個鎖
lockvalueB == null || lockvalueA==lockvalueB,判斷:若果lockvalueB為null,說明該鎖已經被釋放了,此時該進程可以獲取鎖;舊值與返回的lockvalueB一致說明中間未被其他進程獲取該鎖,可以獲取鎖;否則不能獲取鎖,結束,獲取鎖失敗。

舉個例子:

現在有兩台redis服務器,兩個進程同時訪問redis代理,並且代理按順序指向redis服務器,當訪問A服務器時候,在setnx()執行后並且expiro()執行前A宕機了,這時候B在執行的時候就先去判斷 系統當前時間是否大於oldtime+expiro()設置的時間設置的時間,如果大於oldtime+expiro()設置的時間,可以證明A事物的鎖使用權已經失效了,我們就可以刪除事物A的鎖,然后在事物B上重新生成個鎖。

 

1、什么是分布式鎖

分布式鎖就是 多個服務器都有redis,但是共用同一套資源。

2、分布式鎖實現原理

主要就兩個方法

1、getlock() 獲取鎖方法

2、releaselock()釋放鎖方法

然后我們看一下 getlock()方法是怎么寫的

當生成鎖的時候會有一個key也就是上面的taskId,existskey()意思是在分布式的key中是否有和taskId一致的(這個taskId可以認識取隨機數),如果沒有一致的就獲取鎖

然后是releaselock()這個方法里就執行了 deletekey()刪除key方法

 

上述做法有幾個問題

1、當獲取鎖之后同時還沒有刪除key,這時候斷網了,那么就會導致我這個鎖永遠都無法delete

2、同一時間被不同服務器的調用獲取到鎖

先說第一個問題:我們可以通過設置一個釋放鎖的時間來解決這個問題比如2秒,如果斷網了或者其他問題2秒之后自動釋放

在說第二個問題:獲取鎖進行原子性,也就是說在獲取鎖的時候多一步操作,就是當前key不存在時候才可以獲取鎖

SET my_key my_value NX PX milliseconds

其中,NX表示只有當鍵key不存在的時候才會設置key的值,PX表示設置鍵key的過期時間,單位是毫秒。

 

就算是這樣還會有問題:

3、當時間設置2秒,但是我的邏輯代碼執行了3秒,這時候這個鎖會被別的請求獲取到

其實解決也很簡單,我們在釋放鎖的時候設置一個隨機數,在進行deletekey的時候進行隨機數的判斷如果相同才delete當然這個隨機數是在邏輯方法執行完之后生成的

 

本文主要給大家介紹了關於redis實現加鎖的幾種方法,分享出來供大家參考學習,下面話不多說了,來一起看看詳細的介紹吧。

1. redis加鎖分類

redis能用的的加鎖命令分表是INCR、SETNX、SET

2. 第一種鎖命令INCR

這種加鎖的思路是, key 不存在,那么 key 的值會先被初始化為 0 ,然后再執行 INCR 操作進行加一。
然后其它用戶在執行 INCR 操作進行加一時,如果返回的數大於 1 ,說明這個鎖正在被使用當中。

    1、 客戶端A請求服務器獲取key的值為1表示獲取了鎖 

    2、 客戶端B也去請求服務器獲取key的值為2表示獲取鎖失敗

    3、 客戶端A執行代碼完成,刪除鎖

    4、 客戶端B在等待一段時間后在去請求的時候獲取key的值為1表示獲取鎖成功

    5、 客戶端B執行代碼完成,刪除鎖

1
2
$redis->incr($key);
$redis->expire($key, $ttl); //設置生成時間為1秒

3. 第二種鎖SETNX

這種加鎖的思路是,如果 key 不存在,將 key 設置為 value

如果 key 已存在,則 SETNX 不做任何動作

    1、 客戶端A請求服務器設置key的值,如果設置成功就表示加鎖成功

    2、 客戶端B也去請求服務器設置key的值,如果返回失敗,那么就代表加鎖失敗

    3、 客戶端A執行代碼完成,刪除鎖

    4、 客戶端B在等待一段時間后在去請求設置key的值,設置成功

    5、 客戶端B執行代碼完成,刪除鎖   

1
2
$redis->setNX($key, $value);
$redis->expire($key, $ttl);

4. 第三種鎖SET

上面兩種方法都有一個問題,會發現,都需要設置 key 過期。那么為什么要設置key過期呢?如果請求執行因為某些原因意外退出了,導致創建了鎖但是沒有刪除鎖,那么這個鎖將一直存在,以至於以后緩存再也得不到更新。於是乎我們需要給鎖加一個過期時間以防不測。

但是借助 Expire 來設置就不是原子性操作了。所以還可以通過事務來確保原子性,但是還是有些問題,所以官方就引用了另外一個,使用 SET 命令本身已經從版本 2.6.12 開始包含了設置過期時間的功能。

    1、 客戶端A請求服務器設置key的值,如果設置成功就表示加鎖成功

    2、 客戶端B也去請求服務器設置key的值,如果返回失敗,那么就代表加鎖失敗

    3、 客戶端A執行代碼完成,刪除鎖

    4、 客戶端B在等待一段時間后在去請求設置key的值,設置成功

    5、 客戶端B執行代碼完成,刪除鎖

1
$redis->set($key, $value, array('nx', 'ex' => $ttl)); //ex表示秒

5. 其它問題

雖然上面一步已經滿足了我們的需求,但是還是要考慮其它問題?

    1、 redis發現鎖失敗了要怎么辦?中斷請求還是循環請求?

    2、 循環請求的話,如果有一個獲取了鎖,其它的在去獲取鎖的時候,是不是容易發生搶鎖的可能?

    3、 鎖提前過期后,客戶端A還沒執行完,然后客戶端B獲取到了鎖,這時候客戶端A執行完了,會不會在刪鎖的時候把B的鎖給刪掉?

6. 解決辦法

針對問題1:使用循環請求,循環請求去獲取鎖

針對問題2:針對第二個問題,在循環請求獲取鎖的時候,加入睡眠功能,等待幾毫秒在執行循環

針對問題3:在加鎖的時候存入的key是隨機的。這樣的話,每次在刪除key的時候判斷下存入的key里的value和自己存的是否一樣

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
do { //針對問題1,使用循環
   $timeout = 10;
   $roomid = 10001;
   $key = 'room_lock';
   $value = 'room_'.$roomid; //分配一個隨機的值針對問題3
   $isLock = Redis::set($key, $value, 'ex', $timeout, 'nx');//ex 秒
   if ($isLock) {
     if (Redis::get($key) == $value) { //防止提前過期,誤刪其它請求創建的鎖
       //執行內部代碼
       Redis::del($key);
       continue;//執行成功刪除key並跳出循環
     }
   } else {
     usleep(5000); //睡眠,降低搶鎖頻率,緩解redis壓力,針對問題2
   }
} while(!$isLock);

7. 另外一個鎖

以上的鎖完全滿足了需求,但是官方另外還提供了一套加鎖的算法,這里以PHP為例

1
2
3
4
5
6
7
8
9
10
11
12
13
$servers = [
   ['127.0.0.1', 6379, 0.01],
   ['127.0.0.1', 6389, 0.01],
   ['127.0.0.1', 6399, 0.01],
];
 
$redLock = new RedLock($servers);
 
//加鎖
$lock = $redLock->lock('my_resource_name', 1000);
 
//刪除鎖
$redLock->unlock($lock)

上面是官方提供的一個加鎖方法,就是和第6的大體方法一樣,只不過官方寫的更健壯。所以可以直接使用官方提供寫好的類方法進行調用。官方提供了各種語言如何實現鎖。

總結

以上就是這篇文章的全部內容了,希望本文的內容對大家的學習或者工作能帶來一定的幫助,如果有疑問大家可以留言交流,謝謝大家對腳本之家的支持。

redis和redission分布式鎖原理及區別

我最近做租車項目,在處理分布式時用到分布式鎖,我發現很多同事都在網上找分布式鎖的資料,但是看的資料都不是很全,所以在這里我談談自己的分布式鎖理解。

結合我的其中某一業務需求:多個用戶在同一個區域內發現只有一輛可租的車,最終結果肯定只有一位用戶租車成功,這就產生了多線程(多個用戶)搶同一資源的問題。

1、有的同伴想到了synchronized關鍵字鎖,暫且拋開性能問題,項目為了高可用,都會做集群部署,那么synchronized就失去了加鎖的意義,這里多嘴解釋一下:

2、有的小伙伴可能想到了樂觀鎖,沒錯!!樂觀鎖可以解決的我的問題,但是在高並發的場景,頻繁的操作數據庫,數據庫的資源是很珍貴的,並且還存在性能的問題。
但是我這里簡單說下樂觀鎖的使用:

我們在車的表中添加一個字段:version(int類型)(建議使用這個名稱,這樣別人看到就會直覺這是樂觀鎖字段,也可以使用別的名稱)
查詢出該車的數據,數據中就有version字段,假如version=1
select * from u_car where car_id = 10;

修改該車的狀態為鎖定,
update u_car set status = 2,version = version +1 where car_id = 10 and version = 1
在修改的時候將version作為參數,如果其他用戶鎖車,那么version已經發生變化(version = version +1),所以version = 1不成立,修改失敗

樂觀鎖不是本次的終點,但還是簡單說下;

3、使用redis的分布式鎖:

public boolean lock(String key, V v, int expireTime){
//獲取鎖
//在redis早期版本中,設置key和key的存活時間是分開的,設置key成功,但是設置存活時間時服務宕機,那么你的key就永遠不會過期,有BUG
//后來redis將加鎖和設置時間用同一個命令
//這里是重點,redis.setNx(key,value,time)方法是原子性的,設置key成功說明鎖車成功,如果失敗說明該車被別人租了

boolean b = false;
try {
b = redis.setNx(key, v, expireTime);
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return b;
}
public boolean unlock(String key){
return redis.delete(key);
}
}

但是這樣寫還是存在BUG的,我的key設置了加鎖時間為5秒,但是我的業務邏輯5秒還沒有執行完成,key過期了,那么其他用戶執行redis.setNx(key, v, expireTime)時就成功了,將該車鎖定,又產生了搶資源;
我們想一下,如果我能夠在業務邏輯沒有執行完的時候,讓鎖過期后能夠延長鎖的時間,是不是就解決了上面的BUG;
實現這個鎖的延長,非要自己動手的話就得另啟一個線程來監聽我們的業務線程,每隔1秒監測當前業務線程是否執行完成,如果沒有就獲取key的存活時間,時間小於一個閾值時,就自動給key設置N秒;
當然,我們可以不用自己動手,redission已經幫我們實現key的時間時間過期問題;

4、使用redission的分布式鎖:

//引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.10.6</version>
</dependency>

redisson支持單點、集群等模式,這里選擇單點的。application.yml配置好redis的連接:

spring:
redis:
host: 127.0.0.1
port: 6379
password:

配置redisson的客戶端bean

@Configuration
public class RedisConfig {
    @Value("${spring.redis.host}")
    private String host;
 
    @Bean(name = {"redisTemplate", "stringRedisTemplate"})
    public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
        StringRedisTemplate redisTemplate = new StringRedisTemplate();
        redisTemplate.setConnectionFactory(factory);
        return redisTemplate;
    }
 
    @Bean
    public Redisson redisson() {
        Config config = new Config();
        config.useSingleServer().setAddress("redis://" + host + ":6379");
        return (Redisson) Redisson.create(config);
    }
 
}

加鎖使用

private Logger log = LoggerFactory.getLogger(getClass());
@Resource
private Redisson redisson;
//加鎖
public Boolean lock(String key,long waitTime,long leaseTime){
	Boolean  b = false;
	try {
        RLock rLock = redisson.getLock(key);
        //說下參數 waitTime:鎖的存活時間 leaseTime:鎖的延長時間 后面的參數是單位
        b = rLock.tryLock(waitTime,leaseTime,TimeUnit.SECONDS);
      } catch (Exception e) {
         log.error(e.getMessage(), e);
      } 
    }
    return b;
}
//釋放鎖
public void unlock(String key){
	try {
		RLock rLock = redisson.getLock(key);
		if(null!=lock){
			lock.unlock();
			lock.forceUnlock();
			fileLog.info("unlock succesed");
    	}
    } catch (Exception e) {
        fileLog.error(e.getMessage(), e);
    }
}
————————————————

public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedException {
        long time = unit.toMillis(waitTime);
        long current = System.currentTimeMillis();
        long threadId = Thread.currentThread().getId();
        //嘗試獲取鎖,如果沒取到鎖,則獲取鎖的剩余超時時間
        Long ttl = tryAcquire(leaseTime, unit, threadId);
        // lock acquired
        if (ttl == null) {
            return true;
        }
        //如果waitTime已經超時了,就返回false
        time -= System.currentTimeMillis() - current;
        if (time <= 0) {
            acquireFailed(threadId);
            return false;
        }
        
        current = System.currentTimeMillis();
        RFuture<RedissonLockEntry> subscribeFuture = subscribe(threadId);
        if (!await(subscribeFuture, time, TimeUnit.MILLISECONDS)) {
            if (!subscribeFuture.cancel(false)) {
                subscribeFuture.onComplete((res, e) -> {
                    if (e == null) {
                        unsubscribe(subscribeFuture, threadId);
                    }
                });
            }
            acquireFailed(threadId);
            return false;
        }
 
        try {
            time -= System.currentTimeMillis() - current;
            if (time <= 0) {
                acquireFailed(threadId);
                return false;
            }
            //進入死循環,反復去調用tryAcquire嘗試獲取鎖,ttl為null時就是別的線程已經unlock了
            while (true) {
                long currentTime = System.currentTimeMillis();
                ttl = tryAcquire(leaseTime, unit, threadId);
                // lock acquired
                if (ttl == null) {
                    return true;
                }
 
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
 
                // waiting for message
                currentTime = System.currentTimeMillis();
                if (ttl >= 0 && ttl < time) {
                    getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } else {
                    getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
                }
 
                time -= System.currentTimeMillis() - currentTime;
                if (time <= 0) {
                    acquireFailed(threadId);
                    return false;
                }
            }
        } finally {
            unsubscribe(subscribeFuture, threadId);
        }
//        return get(tryLockAsync(waitTime, leaseTime, unit));
    }
————————————————

 

可以看到,其中主要的邏輯就是嘗試加鎖,成功了就返回true,失敗了就進入死循環反復去嘗試加鎖。中途還有一些超時的判斷。邏輯還是比較簡單的。

再看看tryAcquire方法

 

這個方法的調用棧也是比較多,之后會進入下面這個方法

 

上面的lua(俗稱膠水語言)腳本比較重要,主要是為了執行命令的原子性解釋一下:
KEYS[1]代表你的key
ARGV[1]代表你的key的存活時間,默認存活30秒
ARGV[2]代表的是請求加鎖的客戶端ID,后面的1則理解為加鎖的次數,簡單理解就是 如果該客戶端多次對key加鎖時,就會執行hincrby原子加1命令
第一段if就是判斷你的key是否存在,如果不存在,就執行redis call(hset key ARGV[2],1)加鎖和設置redis call(pexpire key ARGV[1])存活時間;
當第二個客戶來加鎖時,第一個if判斷已存在key,就執行第二個if判斷key的hash是否存在客戶端2的ID,很明顯不是;
則進入到最后的return返回該key的剩余存活時間

當加鎖成功后會在后台啟動一個watch dog(看門狗)線程,key的默認存活時間為30秒,則watch dog每隔10秒鍾就會檢查一下客戶端1是否還持有該鎖,如果持有,就會不斷的延長鎖key的存活時間

所以這里建議大家在設置key的存活時間時,最好大於10秒,延續時間也大於等於10秒
所以,總體流程應該是這樣的。
————————————————

一、Jedis,Redisson,Lettuce 三者的區別

共同點:都提供了基於 Redis 操作的 Java API,只是封裝程度,具體實現稍有不同。

不同點:

  • 1.1、Jedis

是 Redis 的 Java 實現的客戶端。支持基本的數據類型如:String、Hash、List、Set、Sorted Set。

特點:使用阻塞的 I/O,方法調用同步,程序流需要等到 socket 處理完 I/O 才能執行,不支持異步操作。Jedis 客戶端實例不是線程安全的,需要通過連接池來使用 Jedis。

  • 1.1、Redisson

優點點:分布式鎖,分布式集合,可通過 Redis 支持延遲隊列。

  • 1.3、 Lettuce

用於線程安全同步,異步和響應使用,支持集群,Sentinel,管道和編碼器。

基於 Netty 框架的事件驅動的通信層,其方法調用是異步的。Lettuce 的 API 是線程安全的,所以可以操作單個 Lettuce 連接來完成各種操作。

二、Jedis

三、RedisTemplate

3.1、使用配置

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>

application-dev.yml

spring:
  redis:
    host: 192.168.1.140
    port: 6379
    password:
    database: 15 # 指定redis的分庫(共16個0到15)

3.2、使用示例

@Resource
 private StringRedisTemplate stringRedisTemplate;
 
    @Override
    public CustomersEntity findById(Integer id) {
        // 需要緩存
        // 所有涉及的緩存都需要刪除,或者更新
        try {
            String toString = stringRedisTemplate.opsForHash().get(REDIS_CUSTOMERS_ONE, id + "").toString();
            if (toString != null) {
                return JSONUtil.toBean(toString, CustomersEntity.class);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 緩存為空的時候,先查,然后緩存redis
        Optional<CustomersEntity> byId = customerRepo.findById(id);
        if (byId.isPresent()) {
            CustomersEntity customersEntity = byId.get();
            try {
                stringRedisTemplate.opsForHash().put(REDIS_CUSTOMERS_ONE, id + "", JSONUtil.toJsonStr(customersEntity));
            } catch (Exception e) {
                e.printStackTrace();
            }
            return customersEntity;
        }
        return null;
    }

3.3、擴展

3.3.1、spring-boot-starter-data-redis 的依賴包

8373b23f4150a99b14f9bf0ebbc380f3.png

3.3.2、stringRedisTemplate API(部分展示)

opsForHash --> hash 操作
opsForList --> list 操作
opsForSet --> set 操作
opsForValue --> string 操作
opsForZSet --> Zset 操作
7f43b40ae0f3df3c40ca4e788b08bf5c.png

3.3.3 StringRedisTemplate 默認序列化機制

public class StringRedisTemplate extends RedisTemplate<String, String> {
 
	/**
	 * Constructs a new <code>StringRedisTemplate</code> instance. {@link #setConnectionFactory(RedisConnectionFactory)}
	 * and {@link #afterPropertiesSet()} still need to be called.
	 */
	public StringRedisTemplate() {
		RedisSerializer<String> stringSerializer = new StringRedisSerializer();
		setKeySerializer(stringSerializer);
		setValueSerializer(stringSerializer);
		setHashKeySerializer(stringSerializer);
		setHashValueSerializer(stringSerializer);
	}
}

四、RedissonClient 操作示例

4.1 基本配置

4.1.1、Maven pom 引入

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson</artifactId>
            <version>3.8.2</version>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>LATEST</version>
        </dependency>

4.1.2、添加配置文件 Yaml 或者 json 格式

redisson-config.yml 或者,配置 redisson-config.json

# Redisson 配置
singleServerConfig:
  address: "redis://192.168.1.140:6379"
  password: null
  clientName: null
  database: 15 #選擇使用哪個數據庫0~15
  idleConnectionTimeout: 10000
  pingTimeout: 1000
  connectTimeout: 10000
  timeout: 3000
  retryAttempts: 3
  retryInterval: 1500
  reconnectionTimeout: 3000
  failedAttempts: 3
  subscriptionsPerConnection: 5
  subscriptionConnectionMinimumIdleSize: 1
  subscriptionConnectionPoolSize: 50
  connectionMinimumIdleSize: 32
  connectionPoolSize: 64
  dnsMonitoringInterval: 5000
  #dnsMonitoring: false
 
threads: 0
nettyThreads: 0
codec:
  class: "org.redisson.codec.JsonJacksonCodec"
transportMode: "NIO"

-----------------------

{
  "singleServerConfig": {
    "idleConnectionTimeout": 10000,
    "pingTimeout": 1000,
    "connectTimeout": 10000,
    "timeout": 3000,
    "retryAttempts": 3,
    "retryInterval": 1500,
    "reconnectionTimeout": 3000,
    "failedAttempts": 3,
    "password": null,
    "subscriptionsPerConnection": 5,
    "clientName": null,
    "address": "redis://192.168.1.140:6379",
    "subscriptionConnectionMinimumIdleSize": 1,
    "subscriptionConnectionPoolSize": 50,
    "connectionMinimumIdleSize": 10,
    "connectionPoolSize": 64,
    "database": 0,
    "dnsMonitoring": false,
    "dnsMonitoringInterval": 5000
  },
  "threads": 0,
  "nettyThreads": 0,
  "codec": null,
  "useLinuxNativeEpoll": false
}

4.1.3、讀取配置

新建讀取配置類

@Configuration
public class RedissonConfig {
 
    @Bean
    public RedissonClient redisson() throws IOException {
 
        // 兩種讀取方式,Config.fromYAML 和 Config.fromJSON
//        Config config = Config.fromJSON(RedissonConfig.class.getClassLoader().getResource("redisson-config.json"));
        Config config = Config.fromYAML(RedissonConfig.class.getClassLoader().getResource("redisson-config.yml"));
        return Redisson.create(config);
    }
}

或者,在 application.yml 中配置如下

spring:
  redis:
    redisson:
      config: classpath:redisson-config.yaml

4.2 使用示例

@RestController
@RequestMapping("/")
public class TeController {
 
    @Autowired
    private RedissonClient redissonClient;
 
    static long i = 20;
    static long sum = 300;
 
//    ========================== String =======================
    @GetMapping("/set/{key}")
    public String s1(@PathVariable String key) {
        // 設置字符串
        RBucket<String> keyObj = redissonClient.getBucket(key);
        keyObj.set(key + "1-v1");
        return key;
    }
 
    @GetMapping("/get/{key}")
    public String g1(@PathVariable String key) {
        // 設置字符串
        RBucket<String> keyObj = redissonClient.getBucket(key);
        String s = keyObj.get();
        return s;
    }
 
    //    ========================== hash =======================-=
 
    @GetMapping("/hset/{key}")
    public String h1(@PathVariable String key) {
 
        Ur ur = new Ur();
        ur.setId(MathUtil.randomLong(1,20));
        ur.setName(key);
      // 存放 Hash
        RMap<String, Ur> ss = redissonClient.getMap("UR");
        ss.put(ur.getId().toString(), ur);
        return ur.toString();
    }
 
    @GetMapping("/hget/{id}")
    public String h2(@PathVariable String id) {
        // hash 查詢
        RMap<String, Ur> ss = redissonClient.getMap("UR");
        Ur ur = ss.get(id);
        return ur.toString();
    }
 
    // 查詢所有的 keys
    @GetMapping("/all")
    public String all(){
        RKeys keys = redissonClient.getKeys();
        Iterable<String> keys1 = keys.getKeys();
        keys1.forEach(System.out::println);
        return keys.toString();
    }
 
    // ================== ==============讀寫鎖測試 =============================
 
    @GetMapping("/rw/set/{key}")
    public void rw_set(){
//        RedissonLock.
        RBucket<String> ls_count = redissonClient.getBucket("LS_COUNT");
        ls_count.set("300",360000000l, TimeUnit.SECONDS);
    }
 
    // 減法運算
    @GetMapping("/jf")
    public void jf(){
 
        String key = "S_COUNT";
 
//        RAtomicLong atomicLong = redissonClient.getAtomicLong(key);
//        atomicLong.set(sum);
//        long l = atomicLong.decrementAndGet();
//        System.out.println(l);
 
        RAtomicLong atomicLong = redissonClient.getAtomicLong(key);
        if (!atomicLong.isExists()) {
            atomicLong.set(300l);
        }
 
        while (i == 0) {
            if (atomicLong.get() > 0) {
                long l = atomicLong.getAndDecrement();
                        try {
                            Thread.sleep(1000l);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                i --;
                System.out.println(Thread.currentThread().getName() + "->" + i + "->" + l);
            }
        }
 
 
    }
 
    @GetMapping("/rw/get")
    public String rw_get(){
 
        String key = "S_COUNT";
        Runnable r = new Runnable() {
            @Override
            public void run() {
                RAtomicLong atomicLong = redissonClient.getAtomicLong(key);
                if (!atomicLong.isExists()) {
                    atomicLong.set(300l);
                }
                if (atomicLong.get() > 0) {
                    long l = atomicLong.getAndDecrement();
                    i --;
                    System.out.println(Thread.currentThread().getName() + "->" + i + "->" + l);
                }
            }
        };
 
        while (i != 0) {
            new Thread(r).start();
//            new Thread(r).run();
//            new Thread(r).run();
//            new Thread(r).run();
//            new Thread(r).run();
        }
 
 
        RBucket<String> bucket = redissonClient.getBucket(key);
        String s = bucket.get();
        System.out.println("================線程已結束================================" + s);
 
        return s;
    }
 
}

4.3 擴展

4.3.1 豐富的 jar 支持,尤其是對 Netty NIO 框架

4.3.2 豐富的配置機制選擇,這里是詳細的配置說明

關於序列化機制中,就有很多

dea92190bc0b2cffca6c34b9b9c00764.png
62505939d701b81e01d367ddfd4836d5.png

4.3.3 API 支持(部分展示),具體的 Redis --> RedissonClient , 可查看這里

d51fd6cb8980ec927f8442d6867de04d.png

4.3.4 輕便的豐富的鎖機制的實現

4.3.4.1 Lock

4.3.4.2 Fair Lock

4.3.4.3 MultiLock

4.3.4.4 RedLock

4.3.4.5 ReadWriteLock

4.3.4.6 Semaphore

4.3.4.7 PermitExpirableSemaphore

4.3.4.8 CountDownLatch

五、基於注解實現的 Redis 緩存

5.1 Maven 和 YML 配置

參考 RedisTemplate 配置

另外,還需要額外的配置類

// todo 定義序列化,解決亂碼問題
@EnableCaching
@Configuration
@ConfigurationProperties(prefix = "spring.cache.redis")
public class RedisCacheConfig {
 
    private Duration timeToLive = Duration.ZERO;
 
    public void setTimeToLive(Duration timeToLive) {
        this.timeToLive = timeToLive;
    }
 
    @Bean
    public CacheManager cacheManager(RedisConnectionFactory factory) {
        RedisSerializer<String> redisSerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
 
        // 解決查詢緩存轉換異常的問題
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(om);
 
        // 配置序列化(解決亂碼的問題)
        RedisCacheConfiguration config = RedisCacheConfiguration.defaultCacheConfig()
                .entryTtl(timeToLive)
                .serializeKeysWith(RedisSerializationContext.SerializationPair.fromSerializer(redisSerializer))
                .serializeValuesWith(RedisSerializationContext.SerializationPair.fromSerializer(jackson2JsonRedisSerializer))
                .disableCachingNullValues();
 
        RedisCacheManager cacheManager = RedisCacheManager.builder(factory)
                .cacheDefaults(config)
                .build();
        return cacheManager;
    }
 
}

5.2 使用示例

@Transactional
@Service
public class ReImpl implements RedisService {
 
    @Resource
    private CustomerRepo customerRepo;
    @Resource
    private StringRedisTemplate stringRedisTemplate;
 
    public static final String REDIS_CUSTOMERS_ONE = "Customers";
 
    public static final String REDIS_CUSTOMERS_ALL = "allList";
 
    // =====================================================================使用Spring cahce 注解方式實現緩存
    // ==================================單個操作
 
    @Override
    @Cacheable(value = "cache:customer", unless = "null == #result",key = "#id")
    public CustomersEntity cacheOne(Integer id) {
        final Optional<CustomersEntity> byId = customerRepo.findById(id);
        return byId.isPresent() ? byId.get() : null;
    }
 
    @Override
    @Cacheable(value = "cache:customer", unless = "null == #result", key = "#id")
    public CustomersEntity cacheOne2(Integer id) {
        final Optional<CustomersEntity> byId = customerRepo.findById(id);
        return byId.isPresent() ? byId.get() : null;
    }
 
     // todo 自定義redis緩存的key,
    @Override
    @Cacheable(value = "cache:customer", unless = "null == #result", key = "#root.methodName + '.' + #id")
    public CustomersEntity cacheOne3(Integer id) {
        final Optional<CustomersEntity> byId = customerRepo.findById(id);
        return byId.isPresent() ? byId.get() : null;
    }
 
    // todo 這里緩存到redis,還有響應頁面是String(加了很多轉義符\,),不是Json格式
    @Override
    @Cacheable(value = "cache:customer", unless = "null == #result", key = "#root.methodName + '.' + #id")
    public String cacheOne4(Integer id) {
        final Optional<CustomersEntity> byId = customerRepo.findById(id);
        return byId.map(JSONUtil::toJsonStr).orElse(null);
    }
 
     // todo 緩存json,不亂碼已處理好,調整序列化和反序列化
    @Override
    @Cacheable(value = "cache:customer", unless = "null == #result", key = "#root.methodName + '.' + #id")
    public CustomersEntity cacheOne5(Integer id) {
        Optional<CustomersEntity> byId = customerRepo.findById(id);
        return byId.filter(obj -> !StrUtil.isBlankIfStr(obj)).orElse(null);
    }
 
 
 
    // ==================================刪除緩存
    @Override
    @CacheEvict(value = "cache:customer", key = "'cacheOne5' + '.' + #id")
    public Object del(Integer id) {
        // 刪除緩存后的邏輯
        return null;
    }
 
    @Override
    @CacheEvict(value = "cache:customer",allEntries = true)
    public void del() {
 
    }
 
    @CacheEvict(value = "cache:all",allEntries = true)
    public void delall() {
 
    }
    // ==================List操作
 
    @Override
    @Cacheable(value = "cache:all")
    public List<CustomersEntity> cacheList() {
        List<CustomersEntity> all = customerRepo.findAll();
        return all;
    }
 
    // todo 先查詢緩存,再校驗是否一致,然后更新操作,比較實用,要清楚緩存的數據格式(明確業務和緩存模型數據)
    @Override
    @CachePut(value = "cache:all",unless = "null == #result",key = "#root.methodName")
    public List<CustomersEntity> cacheList2() {
        List<CustomersEntity> all = customerRepo.findAll();
        return all;
    }
 
}

5.3 擴展

基於 spring 緩存實現
86ab93d5014c7efe76191358e908cb54.png


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM