一、單進程多線程的鎖--線程鎖
鎖住線程的鎖叫線程鎖,像C#中的lock,Monitor,讓線程排隊,同一時刻只能有一個線程進來,讓線程同步排隊。
二、多進程的鎖--分布式鎖
鎖住進程的鎖就叫分布式鎖,是鎖住進程的一種機制,讓進程排隊。
三、電商秒殺場景
1、單體架構
並發量不夠,秒殺服務只能並發1000,而客戶端同時發送3000個請求。
2、集群架構
這時候就需要多兩個角色,一個角色是網關,一個角色是秒殺集群,網關把用戶請求轉發到3個秒殺服務,這樣每個秒殺服務並發1000個請求,就能夠滿足客戶端同時發送3000個請求。
四、秒殺服務集群帶來新的問題
第1個請求進入到秒殺服務1里面,查詢數據庫商品庫存是10,判斷有庫存,扣減庫存,更新數據庫,當前庫存是9。
第2個請求進入到秒殺服務2里面,查詢數據庫商品庫存是10,判斷有庫存,扣減庫存,更新數據庫,當前庫存是9。
第3個請求進入到秒殺服務3里面,查詢數據庫商品庫存是10,判斷有庫存,扣減庫存,更新數據庫,當前庫存是9。
實際庫存只減少了1個,但是同1個商品被3個人秒殺到了,這就是超賣問題。
五、分布式鎖解決什么問題?
分布式系統中,涉及到多個進程共享資源的時候,就需要使用分布式鎖。
誰持有了鎖,誰才能操作數據庫扣減庫存。
六、運行效果
1、單進程發起20個線程模擬20個用戶並發請求,秒殺商品,會發現20個線程,20個請求秒殺到同1個商品。
2、對於單進程可以加lock鎖解決超賣問題
商品庫存有10個,開啟20個線程秒殺商品,有10個請求分別秒殺到不同的商品,另外10個線程沒有秒殺到商品,因為庫存只有10個。
3、我現在把相同的代碼Copy一份,新建個工程MyRedis.SecKill.MultiProcess.Other,也同樣使用了lock鎖,快速的啟動2個進程,每個進程中開啟20個線程就發現lock鎖不住了,lock鎖失效了,同一個商品編號10被2個不同的進程中的線程秒殺到了。
我們看到單進程通過加lock鎖可以保證不發生超賣問題,10個線程秒殺到商品,商品編號不同,另外10個線程沒有秒殺到商品。
但是因為為了提高並發量,現在是秒殺服務集群提供秒殺服務了,我們在兩個秒殺服務進程中都開啟20個線程去秒殺商品,就會發現如圖所示控制不住了,兩個進程中的線程都秒殺到同一個商品了(這里用商品庫存當做商品編號),那么如何解決跨進程並發引起的商品超賣問題?這就需要分布式鎖了。
七、封裝Redis分布式鎖--解決跨進程並發秒殺超賣問題
1、秒殺服務端
namespace MyRedis.SecKill.MultiProcess.SecKill { /// <summary> /// 商品秒殺服務 /// </summary> public class ProductSecKill { /// <summary> /// 秒殺方法 /// </summary> public void SecKillProduct() { RedisLock redisLock = new RedisLock(); redisLock.Lock(); //lock (this)//只是適合單進程 //{ // 1、獲取商品庫存 var productStock = GetPorductStocks(); // 2、判斷商品庫存是否為空 if (productStock.Conut == 0) { // 2.1 秒殺失敗消息 Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}:不好意思,秒殺已結束,商品編號:{productStock.Conut}"); redisLock.UnLock(); return; } // 3、秒殺成功消息 Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}:恭喜你,秒殺成功,商品編號:{productStock.Conut}"); // 4、扣減商品庫存 SubtracPorductStocks(productStock); //} redisLock.UnLock(); } /// <summary> /// 獲取商品庫存 /// </summary> /// <returns></returns> private Product_Stock GetPorductStocks() { using (ShoppingEntities shoppingEntities = new ShoppingEntities()) { // 1、查詢數據庫獲取庫存,獲取第一個商品的庫存數 Product_Stock productStock = shoppingEntities.Product_Stock.FirstOrDefault(s => s.Id == 1); // 2、返回庫存 return productStock; } } /// <summary> /// 扣減商品庫存 /// </summary> private void SubtracPorductStocks(Product_Stock stocks) { using (ShoppingEntities shoppingEntities = new ShoppingEntities()) { // 1、扣減商品庫存 Product_Stock updateStocks = shoppingEntities.Product_Stock.FirstOrDefault(s => s.Id == stocks.Id); updateStocks.Conut = stocks.Conut - 1; // 2、更新數據庫 shoppingEntities.SaveChanges(); } } } }
2、秒殺客戶端
namespace MyRedis.SecKill.MultiProcess { class Program { static void Main(string[] args) { // 1、開始秒殺 ClientRequest.SendRequest(20); Console.ReadKey(); } } }
namespace MyRedis.SecKill.MultiProcess.SecKill { class ClientRequest { /// <summary> /// 客戶端請求 /// </summary> /// <param name="threadCount">線程數</param> public static void SendRequest(int threadCount) { // 1、商品秒殺服務 ProductSecKill productSecKill = new ProductSecKill(); // 2、創建20個請求來秒殺 for (int i = 0; i < threadCount; i++) { Thread thread = new Thread(() => { productSecKill.SecKillProduct(); }); thread.Start(); } } } }
3、Redis分布式鎖
封裝分布式鎖4要素
3.1、鎖名
3.2、加鎖操作
鎖對象,也就是誰持有這把鎖,持有鎖的才能解鎖
3.3、解鎖操作
3.4、鎖超時時間
namespace MyRedis.SecKill.MultiProcess.Locks { /// <summary> /// redis分布式鎖 /// 分布式鎖四要素 /// 1、鎖名 /// 2、加鎖操作 /// 3、解鎖操作 /// 4、鎖超時時間 /// </summary> class RedisLock { // 1、redis連接管理類 private ConnectionMultiplexer connectionMultiplexer = null; // 2、redis數據操作類 private IDatabase database = null; public RedisLock() { connectionMultiplexer = ConnectionMultiplexer.Connect("localhost:6379"); database = connectionMultiplexer.GetDatabase(0); } /// <summary> /// 1、加鎖 /// </summary> public void Lock() { // 1、redis加鎖api--LockTake // key--鎖名--redis_lock // value--鎖對象(誰持有這把鎖)--進程Id+線程Id // expiry--鎖超時時間,為什么?解鎖死鎖問題! // 2、如果加鎖失敗?循環加鎖,對於未知的事情用循環 while (true) { bool flag = database.LockTake("redis_lock", "ProcessNo1" +Thread.CurrentThread.ManagedThreadId, TimeSpan.FromSeconds(10)); // 3、如果加鎖成功,則退出循環 if (flag) { break; } // 3.1 加鎖失敗,線程休眠下,走循環,再嘗試加鎖 Thread.Sleep(200); } } /// <summary> /// 2、解鎖 /// </summary> public void UnLock() { //1、redis解鎖api--LockRelease // key--鎖名--redis_lock // value--鎖對象(誰持有這把鎖)--進程Id+線程Id--使加鎖和解鎖是同一個對象 // 2、如果解鎖失敗?循環解鎖,對於未知的事情用循環 bool flag = database.LockRelease("redis_lock", "ProcessNo1" +Thread.CurrentThread.ManagedThreadId); while (true) { // 3、如果解鎖成功,則退出循環 if (flag) { break; } // 3.1 解鎖失敗,線程休眠下,走循環,再嘗試解鎖 Thread.Sleep(200); } // 4、關閉資源 connectionMultiplexer.Dispose(); } } }
八、再次運行效果
最后我們發現庫存36個商品,2個進程,每個進程開啟20個線程,都是不同的商品編號沒有秒殺到同一件商品。
九、項目結構
十、思考Redis集群環境死鎖
如果在Redis Master上持有了鎖,但是Redis Master宕機了,需要把Redis Slave提成Redis Master,但是原來的Redis Master的鎖沒有釋放,造成死鎖了怎么辦?