作者:菜蚜
my.oschina.net/wnjustdoit/blog/1606215
前言:在分布式環境中,我們經常使用鎖來進行並發控制,鎖可分為樂觀鎖和悲觀鎖,
基於數據庫版本戳的實現是樂觀鎖,基於redis或zookeeper的實現可認為是悲觀鎖了。樂觀鎖和悲觀鎖最根本的區別在於線程之間是否相互阻塞。
那么,本文主要來討論基於redis的分布式鎖算法問題。
從2.6.12版本開始,redis為SET命令增加了一系列選項(set [key] NX/XX EX/PX [expiration]):
-
EX seconds – 設置鍵key的過期時間,單位時秒
-
PX milliseconds – 設置鍵key的過期時間,單位時毫秒
-
NX – 只有鍵key不存在的時候才會設置key的值
-
XX – 只有鍵key存在的時候才會設置key的值
原文地址:https://redis.io/commands/set
中文地址:http://redis.cn/commands/set.html
注意: 由於SET命令加上選項已經可以完全取代SETNX, SETEX, PSETEX的功能,所以在將來的版本中,redis可能會不推薦使用並且最終拋棄這幾個命令。
這里簡單提一下,在舊版本的redis中(指2.6.12版本之前),使用redis實現分布式鎖一般需要setNX、expire、getSet、del等命令。而且會發現這種實現有很多邏輯判斷的原子操作以及本地時間等並沒有控制好。
而在舊版本的redis中,redis的超時時間很難控制,用戶迫切需要把setNX和expiration結合為一體的命令,把他們作為一個原子操作,這樣新版本的多選項set命令誕生了。然而這並沒有完全解決復雜的超時控制帶來的問題。
接下來,我們的一切討論都基於新版redis。
在這里,我先提出幾個在實現redis分布式鎖中需要考慮的關鍵問題
1、死鎖問題;
1.1、為了防止死鎖,redis至少需要設置一個超時時間;
1.2、由1.1引申出來,當鎖自動釋放了,但是程序並沒有執行完畢,這時候其他線程又獲取到鎖執行同樣的程序,可能會造成並發問題,這個問題我們需要考慮一下是否歸屬於分布式鎖帶來問題的范疇。
2、鎖釋放問題,這里會有兩個問題;
2.1、每個獲取redis鎖的線程應該釋放自己獲取到的鎖,而不是其他線程的,所以我們需要在每個線程獲取鎖的時候給鎖做上不同的標記以示區分;
2.2、由2.1帶來的問題是線程在釋放鎖的時候需要判斷當前鎖是否屬於自己,如果屬於自己才釋放,這里涉及到邏輯判斷語句,至少是兩個操作在進行,那么我們需要考慮這兩個操作要在一個原子內執行,否者在兩個行為之間可能會有其他線程插入執行,導致程序紊亂。
3、更可靠的鎖;
單實例的redis(這里指只有一個master節點)往往是不可靠的,雖然實現起來相對簡單一些,但是會面臨着宕機等不可用的場景,即使在主從復制的時候也顯得並不可靠(因為redis的主從復制往往是異步的)。
關於Martin Kleppmann的Redlock的分析
原文地址:https://redis.io/topics/distlock
中文地址:http://redis.cn/topics/distlock.html
文章分析得出,這種算法只需具備3個特性就可以實現一個最低保障的分布式鎖。
-
安全屬性(Safety property): 獨享(相互排斥)。在任意一個時刻,只有一個客戶端持有鎖。
-
活性A(Liveness property A): 無死鎖。即便持有鎖的客戶端崩潰(crashed)或者網絡被分裂(gets partitioned),鎖仍然可以被獲取。
-
活性B(Liveness property B): 容錯。只要大部分Redis節點都活着,客戶端就可以獲取和釋放鎖.
我們來分析一下:
第一點安全屬性意味着悲觀鎖(互斥鎖)是我們做redis分布式鎖的前提,否者將可能造成並發;
第二點表明為了避免死鎖,我們需要設置鎖超時時間,保證在一定的時間過后,鎖可以重新被利用;
第三點是說對於客戶端來說,獲取鎖和手動釋放鎖可以有更高的可靠性。
更進一步分析,結合上文提到的關鍵問題,這里可以引申出另外的兩個問題:
-
怎么才能合理判斷程序真正處理的有效時間范圍?(這里有個時間偏移的問題)
-
redis Master節點宕機后恢復(可能還沒有持久化到磁盤)、主從節點切換,(N/2)+1這里的N應該怎么動態計算更合理?
接下來再看,redis之父antirez對Redlock的評價
原文地址:http://antirez.com/news/101
文中主要提到了網絡延遲和本地時鍾的修改(不管是時間服務器或人為修改)對這種算法可能造成的影響。
最后,來點實踐吧
I、傳統的單實例redis分布式鎖實現(關鍵步驟)
獲取鎖(含自動釋放鎖):
SET resource_name my_random_value NX PX 30000
手動刪除鎖(Lua腳本):
if redis.call("get",KEYS[1]) == ARGV[1] then
return redis.call("del",KEYS[1])
else
return 0
end
II、分布式環境的redis(多master節點)的分布式鎖實現
為了保證在盡可能短的時間內獲取到(N/2)+1個節點的鎖,可以並行去獲取各個節點的鎖(當然,並行可能需要消耗更多的資源,因為串行只需要count到足夠數量的鎖就可以停止獲取了);
另外,怎么動態實時統一獲取redis master nodes需要更進一步去思考了。
QA,補充一下說明(以下為我與朋友溝通的情況,以說明文中大家可能不夠明白的地方):
1、在關鍵問題2.1中,刪除就刪除了,會造成什么問題?
線程A超時,准備刪除鎖;但此時的鎖屬於線程B;線程B還沒執行完,線程A把鎖刪除了,這時線程C獲取到鎖,同時執行程序;所以不能亂刪。
2、在關鍵問題2.2中,只要在key生成時,跟線程相關就不用考慮這個問題了嗎?
不同的線程執行程序,線程之間肯雖然有差異呀,然后在redis鎖的value設置有線程信息,比如線程id或線程名稱,是分布式環境的話加個機器id前綴咯(類似於twitter的snowflake算法!),但是在del命令只會涉及到key,不會再次檢查value,所以還是需要lua腳本控制if(condition){xxx}的原子性。
3、那要不要考慮鎖的重入性?
不需要重入;try…finally 沒得重入的場景;對於單個線程來說,執行是串行的,獲取鎖之后必定會釋放,因為finally的代碼必定會執行啊(只要進入了try塊,finally必定會執行)。
4、為什么兩個線程都會去刪除鎖?(貌似重復的問題。不管怎樣,還是耐心解答吧)
每個線程只能管理自己的鎖,不能管理別人線程的鎖啊。這里可以聯想一下ThreadLocal。
5、如果加鎖的線程掛了怎么辦?只能等待自動超時?
看你怎么寫程序的了,一種是問題3的回答;另外,那就自動超時咯。這種情況也適用於網絡over了。
6、時間太長,程序異常就會蛋疼,時間太短,就會出現程序還沒有處理完就超時了,這豈不是很尷尬?
是呀,所以需要更好的衡量這個超時時間的設置。
實踐部分主要代碼:
RedisLock工具類:
package com.caiya.cms.web.component;
import com.caiya.cache.CacheException;
import com.caiya.cache.redis.JedisCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/**
* redis實現分布式鎖
* 可實現特性:
* 1、使多線程無序排隊獲取和釋放鎖;
* 2、丟棄未成功獲得鎖的線程處理;
* 3、只釋放線程本身加持的鎖;
* 4、避免死鎖
*
* @author wangnan
* @since 1.0
*/
public final class RedisLock {
private static final Logger logger = LoggerFactory.getLogger(RedisLock.class);
/**
* 嘗試加鎖(僅一次)
*
* @param lockKey 鎖key
* @param lockValue 鎖value
* @param expireSeconds 鎖超時時間(秒)
* @return 是否加鎖成功
* @throws CacheException
*/
public static boolean tryLock(String lockKey, String lockValue, long expireSeconds) throws CacheException {
JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
try {
String response = jedisCache.set(lockKey, lockValue, "nx", "ex", expireSeconds);
return Objects.equals(response, "OK");
} finally {
jedisCache.close();
}
}
/**
* 加鎖(指定最大嘗試次數范圍內)
*
* @param lockKey 鎖key
* @param lockValue 鎖value
* @param expireSeconds 鎖超時時間(秒)
* @param tryTimes 最大嘗試次數
* @param sleepMillis 每兩次嘗試之間休眠時間(毫秒)
* @return 是否加鎖成功
* @throws CacheException
*/
public static boolean lock(String lockKey, String lockValue, long expireSeconds, int tryTimes, long sleepMillis) throws CacheException {
boolean result;
int count = 0;
do {
count++;
result = tryLock(lockKey, lockValue, expireSeconds);
try {
TimeUnit.MILLISECONDS.sleep(sleepMillis);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
} while (!result && count <= tryTimes);
return result;
}
/**
* 釋放鎖
*
* @param lockKey 鎖key
* @param lockValue 鎖value
*/
public static void unlock(String lockKey, String lockValue) {
JedisCache jedisCache = JedisCacheFactory.getInstance().getJedisCache();
try {
String luaScript = "if redis.call('get',KEYS[1]) == ARGV[1] then return redis.call('del',KEYS[1]) else return 0 end";
Object result = jedisCache.eval(luaScript, 1, lockKey, lockValue);
// Objects.equals(result, 1L);
} catch (Exception e) {
logger.error(e.getMessage(), e);
} finally {
jedisCache.close();
}
// return false;
}
private RedisLock() {
}
}
使用工具類的代碼片段1:
...
String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();// 跟業務相關的唯一拼接鍵
String lockValue = Constant.DEFAULT_CACHE_NAME + ":" + System.getProperty("JvmId") + ":" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();// 生成集群環境中的唯一值
boolean locked = RedisLock.tryLock(lockKey, lockValue, 100);// 只嘗試一次,在本次處理過程中直接拒絕其他線程的請求
if (!locked) {
throw new IllegalAccessException("您的操作太頻繁了,休息一下再來吧~");
}
try {
// 開始處理核心業務邏輯
Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
...
...
} finally {
RedisLock.unlock(lockKey, lockValue);// 在finally塊中釋放鎖
}
使用工具類的代碼片段2:
...
String lockKey = Constant.DEFAULT_CACHE_NAME + ":addItemApply:" + applyPriceDTO.getItemId() + "_" + applyPriceDTO.getSupplierId();
String lockValue = Constant.DEFAULT_CACHE_NAME + ":機器編號:" + Thread.currentThread().getName() + ":" + System.currentTimeMillis();
boolean locked = RedisLock.lock(lockKey, lockValue, 100, 20, 100);// 非公平鎖,無序競爭(這里需要合理根據業務處理情況設置最大嘗試次數和每次休眠時間)
if (!locked) {
throw new IllegalAccessException("系統太忙,本次操作失敗");// 一般來說,不會走到這一步;如果真的有這種情況,並且在合理設置鎖嘗試次數和等待響應時間之后仍然處理不過來,可能需要考慮優化程序響應時間或者用消息隊列排隊執行了
}
try {
// 開始處理核心業務邏輯
Item item = itemService.queryItemByItemId(applyPriceDTO.getItemId());
...
...
} finally {
RedisLock.unlock(lockKey, lockValue);
}
...
附加:
基於redis的分布式鎖實現客戶端Redisson:
https://github.com/redisson/redisson/wiki/8.-Distributed-locks-and-synchronizers
基於zookeeper的分布式鎖實現:
http://curator.apache.org/curator-recipes/shared-reentrant-lock.html
推薦去我的博客閱讀更多:
2.Spring MVC、Spring Boot、Spring Cloud 系列教程
3.Maven、Git、Eclipse、Intellij IDEA 系列工具教程
覺得不錯,別忘了點贊+轉發哦!