spring boot項目之redis分布式鎖的應用


SETNX key value

key設置值為value,如果key不存在,這種情況下等同SET命令。 當key存在時,什么也不做。SETNX是”SET if Not eXists”的簡寫。

返回值

Integer reply, 特定值:

  • 1 如果key被設置了
  • 0 如果key沒有被設置

##例子

redis> SETNX mykey "Hello"
(integer) 1
redis> SETNX mykey "World"
(integer) 0
redis> GET mykey
"Hello"
redis> 

Design pattern: Locking with !SETNX

設計模式:使用!SETNX加鎖

Please note that:

請注意:

  1. 不鼓勵以下模式來實現the Redlock algorithm ,該算法實現起來有一些復雜,但是提供了更好的保證並且具有容錯性。

  2. 無論如何,我們保留舊的模式,因為肯定存在一些已實現的方法鏈接到該頁面作為引用。而且,這是一個有趣的例子說明Redis命令能夠被用來作為編程原語的。

  3. 無論如何,即使假設一個單例的加鎖原語,但是從 2.6.12 開始,可以創建一個更加簡單的加鎖原語,相當於使用SET命令來獲取鎖,並且用一個簡單的 Lua 腳本來釋放鎖。該模式被記錄在SET命令的頁面中。

也就是說,SETNX能夠被使用並且以前也在被使用去作為一個加鎖原語。例如,獲取鍵為foo的鎖,客戶端可以嘗試一下操作:

SETNX lock.foo <current Unix time + lock timeout + 1>

如果客戶端獲得鎖,SETNX返回1,那么將lock.foo鍵的Unix時間設置為不在被認為有效的時間。客戶端隨后會使用DEL lock.foo去釋放該鎖。

如果SETNX返回0,那么該鍵已經被其他的客戶端鎖定。如果這是一個非阻塞的鎖,才能立刻返回給調用者,或者嘗試重新獲取該鎖,直到成功或者過期超時。

處理死鎖

以上加鎖算法存在一個問題:如果客戶端出現故障,崩潰或者其他情況無法釋放該鎖會發生什么情況?這是能夠檢測到這種情況,因為該鎖包含一個Unix時間戳,如果這樣一個時間戳等於當前的Unix時間,該鎖將不再有效。

當以下這種情況發生時,我們不能調用DEL來刪除該鎖,並且嘗試執行一個SETNX,因為這里存在一個競態條件,當多個客戶端察覺到一個過期的鎖並且都嘗試去釋放它。

  • C1 和 C2 讀lock.foo檢查時間戳,因為他們執行完SETNX后都被返回了0,因為鎖仍然被 C3 所持有,並且 C3 已經崩潰。
  • C1 發送DEL lock.foo
  • C1 發送SETNX lock.foo命令並且成功返回
  • C2 發送DEL lock.foo
  • C2 發送SETNX lock.foo命令並且成功返回
  • 錯誤:由於競態條件導致 C1 和 C2 都獲取到了鎖

幸運的是,可以使用以下的算法來避免這種情況,請看 C4 客戶端所使用的好的算法:

  • C4 發送SETNX lock.foo為了獲得該鎖
  • 已經崩潰的客戶端 C3 仍然持有該鎖,所以Redis將會返回0給 C4
  • C4 發送GET lock.foo檢查該鎖是否已經過期。如果沒有過期,C4 客戶端將會睡眠一會,並且從一開始進行重試操作
  • 另一種情況,如果因為 lock.foo鍵的Unix時間小於當前的Unix時間而導致該鎖已經過期,C4 會嘗試執行以下的操作:

    GETSET lock.foo <current Unix timestamp + lock timeout + 1>
    
  • 由於GETSET 的語意,C4會檢查已經過期的舊值是否仍然存儲在lock.foo中。如果是的話,C4 會獲得鎖
  • 如果另一個客戶端,假如為 C5 ,比 C4 更快的通過GETSET操作獲取到鎖,那么 C4 執行GETSET操作會被返回一個不過期的時間戳。C4 將會從第一個步驟重新開始。請注意:即使 C4 在將來幾秒設置該鍵,這也不是問題。

為了使這種加鎖算法更加的健壯,持有鎖的客戶端應該總是要檢查是否超時,保證使用DEL釋放鎖之前不會過期,因為客戶端故障的情況可能是復雜的,不止是崩潰,還會阻塞一段時間,阻止一些操作的執行,並且在阻塞恢復后嘗試執行DEL(此時,該LOCK已經被其他客戶端所持有)

 

 

GETSET key value

自動將key對應到value並且返回原來key對應的value。如果key存在但是對應的value不是字符串,就返回錯誤。

設計模式

GETSET可以和INCR一起使用實現支持重置的計數功能。舉個例子:每當有事件發生的時候,一段程序都會調用INCR給key mycounter加1,但是有時我們需要獲取計數器的值,並且自動將其重置為0。這可以通過GETSET mycounter “0”來實現:

INCR mycounter
GETSET mycounter "0"
GET mycounter

返回值

bulk-string-reply: 返回之前的舊值,如果之前Key不存在將返回nil

例子

redis> INCR mycounter
(integer) 1
redis> GETSET mycounter "0"
"1"
redis> GET mycounter
"0"
redis> 


那么,我們在處理高並發秒殺下單時,就可以通過redis來實現加鎖和解鎖的功能來做到

一、上鎖
package com.imooc.service;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;
@Component
@Slf4j
public class RedisLock {

@Autowired
private StringRedisTemplate redisTemplate;

/**
* 加鎖
* @param key
* @param value 當前時間+超時時間
* @return
*/
public boolean lock(String key, String value) {
if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
return true;
}
//currentValue=A 這兩個線程的value都是B 其中一個線程拿到鎖
String currentValue = redisTemplate.opsForValue().get(key);
//如果鎖過期
if (!StringUtils.isEmpty(currentValue)
&& Long.parseLong(currentValue) < System.currentTimeMillis()) {
//獲取上一個鎖的時間
String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
return true;
}
}

return false;
}

/**
* 解鎖
* @param key
* @param value
*/
public void unlock(String key, String value) {
try {
String currentValue = redisTemplate.opsForValue().get(key);
if (!StringUtils.isEmpty(currentValue) && currentValue.equals(value)) {
redisTemplate.opsForValue().getOperations().delete(key);
}
}catch (Exception e) {
log.error("【redis分布式鎖】解鎖異常, {}", e);
}
}

}
在Redis.Lock()方法上為什么要加上后面這段代碼呢?

這是因為如果沒有這段代碼,那么在加鎖過程中,如果鎖順利加上了,在接下來下單過程中如果出現了一些錯誤,

就會拋出異常,會導致這個解鎖步驟不會執行,正是因為沒有解鎖,那么下次請求進來時,因為有鎖的存在,那么

if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
return true;
}
這個方法不會返回true,直接返回false,那么就會一直停留在加鎖這一步驟,造成死鎖的局面。那么如果加上這段代碼,會有什么作用呢?
第一,他可以通過這段代碼,在舊鎖過期時,順利拿到鎖。
第二,接下來這段假設很重要!!!
假設兩個線程同時進入加鎖這一步驟,如果之前鎖已經被占用了,那么他們肯定都不會到
if(redisTemplate.opsForValue().setIfAbsent(key, value)) {
return true;
}
return true的這一步,直接往下走,假設currentValue取出的值是A,這兩個線程的value都是B,假設鎖已經過期了
(因為如果鎖還沒過期,那么肯定是返回false的,那么上鎖就不會成功),

那么接下來就會進入這段代碼,
if (!StringUtils.isEmpty(currentValue)
&& Long.parseLong(currentValue) < System.currentTimeMillis()) {
//獲取上一個鎖的時間
String oldValue = redisTemplate.opsForValue().getAndSet(key, value);
if (!StringUtils.isEmpty(oldValue) && oldValue.equals(currentValue)) {
return true;
}
}
這段代碼只會有一個線程去執行,總歸有個先后。假設第一個線程拿到了,那么oldValue的值就是A(已經過期鎖線程的值),
那么當前的currentValue也是A,相等返回true,這個時候第一個線程就拿到了這個鎖,那

當第二個線程拿到oldValue時,它的值就成了B,所以它返回false,所以沒有拿到這把鎖,結果就是只有一個線程拿到了這把鎖。
所以,它既解決了死鎖的問題,又讓多線程訪問時,只讓一個線程拿到鎖。
二、解鎖

 

三、運用

Redis能夠完成分布式鎖的很大原因是因為它是單線程的。

 

 
           
 
          


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM