【本博客屬於原創,如需轉載,請注明出處:https://www.cnblogs.com/gdouzz/p/12097968.html】
最近研究庫存的相關,在高峰期經常出現超賣等等情況,最后根據采用是基於Redis來實現了分布式鎖,特此拿出來和大家分享。
准備工作:centos7,Redis,Nginx,以及JMeter測試工具。
分布式鎖的引出
在傳統的程序中,我們寫了如下最簡單對庫存操作的代碼如下:
下面是基於AspNetCore.WebAPI 創建的一個對庫存進行操作(減少)的接口,我相信很多同志都能夠寫出這種加lock來保證高並發的時候,庫存不會出現超賣,這種做法的性能問題,不屬於我們這篇文章的討論范圍,我們要討論的是,這種寫法到底會不會造成超賣的情況出現呢?,如果這是傳統的企業內部應用,單體架構,如下圖所示,也能滿足需求;
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; using Microsoft.AspNetCore.Mvc; using StackExchange.Redis; namespace RedisDistributedLockMvc.Controllers { [Route("api/inv")] [ApiController] public class InvController : ControllerBase { private static readonly object LockObject = new object(); private static readonly ConfigurationOptions Options = new ConfigurationOptions() { EndPoints = { { "192.168.232.132", 6379 } }, Password = "123456" }; [HttpGet] public ActionResult<string> Get() { string msg = null; lock (LockObject) { int invQty = GetInvQty(); if (invQty > 0) { invQty = invQty - 1; SetInvQty(invQty); msg = $"扣減成功,當前庫存:{invQty}"; } else { msg = "扣減失敗,庫存不足"; } } Console.WriteLine(msg); return msg; } private int GetInvQty() { var qty = 0; using (var conn = ConnectionMultiplexer.Connect(Options)) { var db = conn.GetDatabase(); qty = Convert.ToInt32(db.StringGet("InvQty")); } return qty; } private void SetInvQty(int qty) { using (var conn = ConnectionMultiplexer.Connect(Options)) { var db = conn.GetDatabase(); db.StringSet("InvQty", qty); } } } }
隨着業務的越來越復雜,這種單體架構的形式,已經滿足不了我們的正常業務需求,很多公司演變成了下面這種架構模式;
下面是我在測試環境搭建的過程(為了讓自己有更深刻的體會,我建議大家按照上面的架構圖搭建一個簡單的環境);
1、把上面的AspNetCore代碼發布到Centos 7機器上,分別指向該機器的不同端口(5000,5001),等同於部署了兩份;
2、在Centos 7機器上安裝Nginx,然后修改nginx配置文件,指向剛剛配置配置的地址;
特別說明:如果覺得上面操作很難,在Centos中,可以通過yum源來安裝相應的軟件,通過使用xshell來編寫命令,如果是對文件操作的,不熟悉命令可以用WinScp這種軟件進行可視化修改完后保存。然后需要特別注意的是,防火牆以及相關的端口和服務是否啟動。
除此之外還要特別留意,Nginx是一個進程,剛剛兩個不同的端口,分別對應不同的進程,這三個進程之間的安全是通過叫(seLinux)來管理的,如果nginx配置好之后,外網訪問還是報502,可以嘗試着把seLinux關閉(當然不推薦關閉,也有相關的解決方案)。
當然,如果有同學想嘗試這個過程,碰到問題的也可以聯系我:QQ:3484677573,說明是在博客園看到的即可。
搭建環境完畢之后,接下來開始我們的測試;
測試之前,因為要模擬高並發的環境,我們采用的jmeter作為我們壓測的工具,簡單的使用教程如下:
首先從官網下載和安裝jmeter,安裝完之后,找到安裝下的bin目錄,找到jmeter.bat,雙擊即可啟動jmeter。
打開jmeter之后,添加線程組,比較簡單,可以指定線程的個數等等。
接下來再添加一個HttpRequest,如下圖所示,根據提示,輸入相關的內容,然后點擊上面的運行,即可開始測試。
至此,測試工具也准備OK了,那我們開始測試,先假設我們Redis里面有50個庫存;
接下來在centos啟動linux,以及運行我們的服務,如下圖所示(5000和50001);
使用jmeter進行壓測,調用,看看是否會出現超賣的情況。
1、先開啟50個線程,進行壓測,看看輸出結果,兩個服務輸出的結果如下:
把結果設置成50,確實出現了超賣的情況,只要兩台機器輸出的當前庫存是一樣,就說明出現了超賣。
說明我們最開始的那段代碼在分布式環境下,或者在我們最常見的負載均衡部署方式下面是不行的,所以就提出了我們分布式鎖的解決方案。
PS:我以前也覺得上面這種加鎖的方法,好像是不能用在分布式環境中,經過上面這么一折騰,我印象更加深刻了,也有了更清晰的認識。
分布式鎖的解決方案
分布式鎖的解決方案,在業界內也有很多,也有很多成熟的框架,下面我們介紹一種基於Redis來實現的分布式鎖解決方案;
先解釋一下,前面的做法為什么不行和我們為什么要采取redis來做分布式鎖
1、上面這種Lock屬於進程內的鎖,當只有一個進程的時候(只部署了一台服務器)是沒有問題的,當存在多台服務器的時候,就會出問題;
2、之所以采取Redis來做分布式鎖,Redis是單線程的,當我們有N個請求同時到達的時候,它會通過隊列的形式變成串行訪問;
話不多說,直接看代碼
這個版本的分布式鎖,我們做了最簡單的考慮
1、鎖超時的問題(通過對Redis官網給出的SetNx方法,對應的就是StackExchange.dll里面的 db.StringSet("InvQty222", "111", TimeSpan.FromSeconds(900), When.NotExists, CommandFlags.None);
2、執行過程中,可能出異常的情況,在finally里面釋放鎖;
private static readonly object LockObject = new object(); private static readonly ConfigurationOptions Options = new ConfigurationOptions() { EndPoints = { { "192.168.232.132", 6379 } }, Password = "123456" }; [HttpGet] public ActionResult<string> Get() { #region 用Lock方式實現鎖 //string msg = null; //lock (LockObject) //{ // int invQty = GetInvQty(); // if (invQty > 0) // { // invQty = invQty - 1; // SetInvQty(invQty); // msg = $"扣減成功,當前庫存:{invQty}"; // } // else // { // msg = "扣減失敗,庫存不足"; // } //} //Console.WriteLine(msg); //return msg; #endregion #region Redis實現的第一版本分布式鎖 string msg = null; var isSuccess = SetLockVersion1("1"); //如果key存在返回的就是false //如果key不存在返回,就set,返回true //除此之外,我們還要考慮的是這把鎖的超時時間, //如果這把鎖一直不釋放(執行過程卡住了,那么要考慮把鎖超時) if (isSuccess) { try { int invQty = GetInvQty(); if (invQty > 0) { invQty = invQty - 1; SetInvQty(invQty); msg = $"扣減成功,當前庫存:{invQty}"; } else { msg = "扣減失敗,庫存不足"; } } finally { //還要考慮執行過程中,如果出現了異常,也要把鎖給釋放掉。 UnLockVersion1(); //釋放鎖; } } else { msg = "資源正忙,請刷新后重試"; } Console.WriteLine(msg); return msg; #endregion } private int GetInvQty() { var qty = 0; using (var conn = ConnectionMultiplexer.Connect(Options)) { var db = conn.GetDatabase(); qty = Convert.ToInt32(db.StringGet("InvQty")); } return qty; } private void SetInvQty(int qty) { using (var conn = ConnectionMultiplexer.Connect(Options)) { var db = conn.GetDatabase(); db.StringSet("InvQty", qty); } } private bool SetLockVersion1(string value) { using (var conn = ConnectionMultiplexer.Connect(Options)) { var db = conn.GetDatabase(); var flag= db.StringSet("LockValue", value, TimeSpan.FromSeconds(900), When.NotExists, CommandFlags.None); //如果存在了返回false,不存在才返回true; db.KeyExpire("LockValue", TimeSpan.FromSeconds(10)); return flag; } } private bool UnLockVersion1() { using (var conn = ConnectionMultiplexer.Connect(Options)) { var db = conn.GetDatabase(); return db.KeyDelete("LockValue"); } }
按照上面的程序,我們再把代碼部署到centos上,然后利用jmeter進行壓測;
經過我們這么一折騰,好像超賣的現象沒有出現了;但是我們上面的做法還是比較的簡單,很多情況都沒有考慮在里面,就比如下面這幾種情況;
問題一、鎖失效問題,問題根源,A線程加的鎖,被B線程釋放了。
為了解決這種情況,我們可以在進來的時候,存一個clientId到Redis的鎖里面,再失效key的時候,判斷一下,當前clientId和redis鎖里面的值是否一致,如果一致,就才釋放。
string msg = null; string clientId = Guid.NewGuid().ToString(); var isSuccess = SetLockVersion1(clientId); //如果key存在返回的就是false //如果key不存在返回,就set,返回true //除此之外,我們還要考慮的是這把鎖的超時時間, //如果這把鎖一直不釋放(執行過程卡住了,那么要考慮把鎖超時) if (isSuccess) { try { int invQty = GetInvQty(); if (invQty > 0) { invQty = invQty - 1; SetInvQty(invQty); msg = $"扣減成功,當前庫存:{invQty}"; } else { msg = "扣減失敗,庫存不足"; } } finally { if (clientId.Equals(GetLockValue())) { //還要考慮執行過程中,如果出現了異常,也要把鎖給釋放掉。 UnLockVersion1(); //釋放鎖; } } } else { msg = "資源正忙,請刷新后重試"; } Console.WriteLine(msg); return msg;
問題二:鎖超時的問題,如果客戶端1需要執行這把鎖的時間大於鎖設定的超時時間,該怎么做呢
1、開啟一個守護線程(后台線程),假如你鎖的設置30秒超時,那你每隔10秒去檢查是不是還是client1持有鎖,如果是那就延長10秒,類似於Watch dog思路。
這個守護線程要注意的點就是,如果鎖都沒人使用了,這個守護線程要及時的關閉,不能一直開啟着,如果不需要延長時間即不必要去延長。