一. Redis分布式鎖剖析
1. 背景
在傳統的單體項目中,即部署到單個IIS上,針對並發問題,比如進銷存中的出庫和入庫問題,多個人同時操作,屬於一個IIS進程中多個線程並發操作的問題,這個時候可以引入線程鎖lock/Monitor等,輕松解決這類問題。但是隨着業務量的逐漸增大,比如"秒殺業務", 肯定是集群部署,這個時候線程鎖已經沒用了, 必須引入分布式鎖。
常見的分布式鎖有:數據庫、zookeeper、redis. 本節重點介紹redis的分布式鎖.
如下圖:

參考:https://segmentfault.com/a/1190000018106844
https://www.cnblogs.com/runningsmallguo/p/10322315.html
2. 分布式鎖需要滿足的條件
(1).在分布式系統環境下,一個鎖在同一時間只能被一個服務器獲取;(這是所有分布式鎖的基礎)
(2).高性能的獲取鎖和釋放鎖;(鎖用完了,要及時釋放,以供別人繼續使用)
(3).具備鎖失效機制,防止死鎖;(防止因為某些意外,鎖沒有得到釋放,那別人將永遠無法使用)
(4).具備非阻塞鎖特性,即沒有獲取到鎖將直接返回獲取鎖失敗。(滿足等待鎖的同時,也要滿足非阻塞鎖特性,便於多樣性的業務場景使用)
3. 分布式鎖種類/原理
(1).阻塞鎖
嘗試在redis中創建一個字符串結構緩存,方法傳入key和過期時間(AcquireLock), 其中key對應的value為鎖的過期時間timeout的時間戳。
若redis中沒有這個key,則創建成功(即搶到鎖),然后立即返回;若已經有這個key,則先watch,然后校驗value中的時間戳是否已經超過當前時間。
若已超過,則嘗試使用提交事務的方式覆蓋新的時間戳,事務提交成功(即搶到鎖),然后立即返回;若未超過當前時間或事務提交失敗(即被別人搶到鎖),則進入一個內部優化過的微循環,不斷重試。
傳入的timeout還有一個作用,就是控制重試時間,重試超時后則拋異常,using完成方法調用或者顯式調用dispose,都會直接清除key。
總結:
timeout有兩個意思:一是如果成功加鎖后鎖的過期時間, 二是未成功加鎖后阻塞等待的時間。數據鎖服務通過檢查value中時間戳來判斷是否過期,並不是利用redis在key上設置expire time來通過key的過期實現的。
(2).非阻塞鎖
嘗試在redis中創建一個字符串結構緩存項,方法傳入key、value、timeout(Add),其中value無實際意義,過期時間為傳入的timeout。
若redis中沒有這個key,則創建成功(即搶到鎖),然后立即返回true.若已經有這個key,則立即返回false。以上過程為全局單線程原子操作,整個過程為獨占式操作。IsLock可以檢測key是否存在。
注意:
timeout即成功加鎖后鎖的過期時間,利用redis在key上設置expire time來通過key的過期實現。不要先用IsLock判斷是否有鎖再用Add加鎖,因為這兩個操作非原子性操作,期間會被其他操作干擾。
(3).底層實現主要用到以下幾個指令
A.setnx
setnx key val:當且僅當key不存在時,set一個key為val的字符串,返回1;若key存在,則什么都不做,返回0
B.expire
expire key timeout:為key設置一個超時時間,單位為second,超過這個時間鎖會自動釋放,避免死鎖
C.delete
delete key:刪除key
二. 案例模擬實現
1.場景模擬分析
模擬多個用戶進行秒殺業務,扣減庫存→創建訂單。 (PS:這里只是為了演示分布式鎖而已,實際場景可以利用redis自減Api原子性實現扣減庫存,從而干掉鎖的問題)
總結:真正的秒殺是不會用分布式鎖的, 因為用鎖會存在等待的問題,會產生大量無響應的情況, 實際情況下可以利用Lua腳本結合redis原子性的特點,編寫秒殺業務。詳見:https://www.cnblogs.com/yaopengfei/p/13749772.html
下面分享3個不同的程序集實現分布式鎖的業務.
2. ServiceStack.Redis
(1).阻塞鎖
代碼分享
/// <summary> /// 阻塞鎖 /// </summary> public class BlockingLock { public static void Show(int i, string key, TimeSpan timeout) { using var client = new RedisClient("119.45.174.249", 6379, "123456"); using (var myLock = client.AcquireLock(key, timeout)) //獲取鎖 (此處阻塞,其它線程等待) { var goodNum = client.Get<int>("goodNum"); if (goodNum > 0) { client.Set<int>("goodNum", goodNum - 1); //扣減庫存 var orderNum = client.Incr("orderNum"); Console.WriteLine($"{i}搶購成功,此時的庫存為{goodNum - 1},訂單數量為:{orderNum}"); } else { Console.WriteLine($"商品已經賣光了"); } } } }
調用
static void Main(string[] args) { Console.WriteLine("請輸入開始搶購的時間:"); int minute = int.Parse(Console.ReadLine()); using var client = new RedisClient("119.45.174.249", 6379, "123456"); //商品數量(設置為10) client.Set<int>("goodNum", 10); //訂單數(默認為0) client.Set<int>("orderNum", 0); //開啟30個線程去搶購 Console.WriteLine($"在{minute}分0秒正式開啟秒殺!"); var flag = true; while (flag) { if (DateTime.Now.Minute == minute) //if (true) { flag = false; Parallel.For(0, 30, (i) => { int temp = i; Task.Run(() => { BlockingLock.Show(i, "akey", TimeSpan.FromSeconds(100)); //阻塞鎖 //NoBlockingLock.Show(i, "akey", TimeSpan.FromSeconds(100)); //非阻塞鎖 }); }); } } Console.ReadKey(); }
copy兩套程序同時運行

(2).非阻塞鎖
代碼分享:
/// <summary> /// 非阻塞鎖 /// </summary> public class NoBlockingLock { public static void Show(int i, string key, TimeSpan timeout) { using var client = new RedisClient("119.45.174.249", 6379, "123456"); bool isLocked = client.Add<string>(key, "xxxx", timeout); if (isLocked) { try { var goodNum = client.Get<int>("goodNum"); if (goodNum > 0) { client.Set<int>("goodNum", goodNum - 1); //扣減庫存 var orderNum = client.Incr("orderNum"); //訂單數量自增1 Console.WriteLine($"{i}搶購成功,此時的庫存為{goodNum - 1},訂單數量為:{orderNum}"); } else { Console.WriteLine($"{i}商品已經賣光了"); } } catch (Exception ex) { Console.WriteLine($"{i}報錯了{ex.Message}"); } finally { client.Remove(key); } } else { Console.WriteLine($"{i}搶購失敗:原因:沒有拿到鎖"); } } }
調用
static void Main(string[] args) { Console.WriteLine("請輸入開始搶購的時間:"); int minute = int.Parse(Console.ReadLine()); using var client = new RedisClient("119.45.174.249", 6379, "123456"); //商品數量(設置為10) client.Set<int>("goodNum", 10); //訂單數(默認為0) client.Set<int>("orderNum", 0); //開啟30個線程去搶購 Console.WriteLine($"在{minute}分0秒正式開啟秒殺!"); var flag = true; while (flag) { if (DateTime.Now.Minute == minute) //if (true) { flag = false; Parallel.For(0, 30, (i) => { int temp = i; Task.Run(() => { //BlockingLock.Show(i, "akey", TimeSpan.FromSeconds(100)); //阻塞鎖 NoBlockingLock.Show(i, "akey", TimeSpan.FromSeconds(100)); //非阻塞鎖 }); }); } } Console.ReadKey(); }
copy兩套程序同時運行

3. StackExchange.Redis
代碼分享
public class MyLock1 { public static void Show(int i, string key, TimeSpan timeout) { RedisHelp redis = new RedisHelp("119.45.174.249:6379,password=123456"); var client = redis.GetDatabase(); bool isLocked = client.LockTake(key, Environment.MachineName, timeout); //timeout秒后自動釋放 if (isLocked) { try { var goodNum = int.Parse(client.StringGet("goodNum")); if (goodNum > 0) { client.StringSet("goodNum", goodNum - 1); //扣減庫存 var orderNum = client.StringIncrement("orderNum"); //訂單數量自增1 Console.WriteLine($"{i}搶購成功,此時的庫存為{goodNum - 1},訂單數量為:{orderNum}"); } else { Console.WriteLine($"{i}商品已經賣光了"); } } catch (Exception ex) { Console.WriteLine($"{i}報錯了{ex.Message}"); } finally { client.LockRelease(key, Environment.MachineName); } } else { Console.WriteLine($"{i}搶購失敗:原因:沒有拿到鎖"); } } }
調用:
static void Main(string[] args) { Console.WriteLine("請輸入開始搶購的時間:"); int minute = int.Parse(Console.ReadLine()); RedisHelp redis = new RedisHelp("119.45.174.249:6379,password=123456"); var client = redis.GetDatabase(); //商品數量(設置為10) client.StringSet("goodNum", 10); //訂單數(默認為0) client.StringSet("orderNum", 0); //開啟30個線程去搶購 Console.WriteLine($"在{minute}分0秒正式開啟秒殺!"); var flag = true; while (flag) { if (DateTime.Now.Minute == minute) //if (true) { flag = false; Parallel.For(0, 30, (i) => { int temp = i; Task.Run(() => { MyLock1.Show(i, "akey", TimeSpan.FromSeconds(2)); }); }); } } Console.ReadKey(); }
4. CSRedisCore
代碼分享
public class MyLock1 { public static void Show(int i, string key, int timeout) { RedisHelper.Initialization(new CSRedis.CSRedisClient("119.45.174.249:6379,password=123456,defaultDatabase=0")); var isLocked = RedisHelper.Lock(key, timeout, true); //timeout秒后自動釋放 if (isLocked != null) //獲取超時則返回null { try { var goodNum = int.Parse(RedisHelper.Get("goodNum")); if (goodNum > 0) { RedisHelper.Set("goodNum", goodNum - 1); //扣減庫存 var orderNum = RedisHelper.IncrBy("orderNum"); //訂單數量自增1 Console.WriteLine($"{i}搶購成功,此時的庫存為{goodNum - 1},訂單數量為:{orderNum}"); } else { Console.WriteLine($"商品已經賣光了"); } } catch (Exception ex) { Console.WriteLine($"報錯了{ex.Message}"); } finally { RedisHelper.Del(key); //上面可以自動刪除,還需要手動刪除嗎? } } else { Console.WriteLine($"{i}搶購失敗:原因:沒有拿到鎖"); } } }
調用
static void Main(string[] args) { Console.WriteLine("請輸入開始搶購的時間:"); int minute = int.Parse(Console.ReadLine()); var client = new CSRedis.CSRedisClient("119.45.174.249:6379,password=123456,defaultDatabase=0"); //商品數量(設置為10) client.Set("goodNum", 10); //訂單數(默認為0) client.Set("orderNum", 0); //開啟30個線程去搶購 Console.WriteLine($"在{minute}分0秒正式開啟秒殺!"); var flag = true; while (flag) { if (DateTime.Now.Minute == minute) //if (true) { flag = false; Parallel.For(0, 30, (i) => { int temp = i; Task.Run(() => { MyLock1.Show(i, "akey", 2); }); }); } } Console.ReadKey(); }
5. RedLock分布式鎖
使用程序集【RedLock.net】,內部還是基於【StackExchange.Redis】來實現的。
(GitHub:https://github.com/samcook/RedLock.net)
{
var existingConnectionMultiplexer1 = ConnectionMultiplexer.Connect("47.101.216.xx:6379,password=xxx");
var multiplexers = new List<RedLockMultiplexer> { existingConnectionMultiplexer1, };
var redlockFactory = RedLockFactory.Create(multiplexers);
var lockKey = "ypf1234567";
var expiry = TimeSpan.FromSeconds(30);
using var redLock = await redlockFactory.CreateLockAsync(lockKey, expiry); // there are also non async Create() methods
if (redLock.IsAcquired)
{
//表示獲取到鎖了, 下面寫需要鎖定的業務
Console.WriteLine("表示獲取到鎖了, 下面寫需要鎖定的業務");
}
else
{
//獲取鎖失敗
Console.WriteLine("獲取鎖失敗");
}
Console.ReadKey();
}
!
- 作 者 : Yaopengfei(姚鵬飛)
- 博客地址 : http://www.cnblogs.com/yaopengfei/
- 聲 明1 : 如有錯誤,歡迎討論,請勿謾罵^_^。
- 聲 明2 : 原創博客請在轉載時保留原文鏈接或在文章開頭加上本人博客地址,否則保留追究法律責任的權利。
