為什么要用分布式鎖?
先上一張截圖,這是在瀏覽別人的博客時看到的.
在了解為什么要用分布式鎖之前,我們應該知道到底什么是分布式鎖.
鎖按照不同的維度,有多種分類.比如
1.悲觀鎖,樂觀鎖;
2.公平鎖,非公平鎖;
3.獨享鎖,共享鎖;
4.線程鎖,進程鎖;
等等.
我們平時用的鎖,比如 lock,它是線程鎖,主要用來給方法,代碼塊加鎖.由於進程的內存單元是被其所有線程共享的,所以線程鎖控制的實際是多個線程對同一塊內存區域的訪問.
有線程鎖,就必然有進程鎖.顧名思義,進程鎖的目的是控制多個進程對共享資源的訪問.因為進程之間彼此獨立,各個進程是無法控制其他進程對資源的訪問,所以只能通過操作系統來控制.比如 Mutex.
但是進程鎖有一個前提,那就是需要多個進程在同一個系統中,如果多個進程不在同一個系統,那就只能使用分布式鎖來控制了.
分布式鎖是控制分布式系統中不同系統之間訪問共享資源的一種鎖實現.它和線程鎖,進程鎖的作用都是一樣,只是范圍不一樣.
所以要實現分布式鎖,就必須依靠第三方存儲介質來存儲鎖的信息.因為各個進程之間彼此誰都不服誰,只能找一個帶頭大哥咯;
以下示例需引用NUGET: CSRedisCore
示例一
CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0"); var lockKey = "lockKey"; var stock = 5;//商品庫存 var taskCount = 10;//線程數量 redisClient.Del(lockKey);//測試前,先把鎖刪了. for (int i = 0; i < taskCount; i++) { Task.Run(() => { //獲取鎖 do { //setnx : key不存在才會成功,存在則失敗. var success = redisClient.SetNx(lockKey, 1); if (success == true) { break; } Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖 } while (true); Console.WriteLine($"線程:{Task.CurrentId} 拿到了鎖,開始消費"); if (stock <= 0) { Console.WriteLine($"庫存不足,線程:{Task.CurrentId} 搶購失敗!"); redisClient.Del(lockKey); return; } stock--; //模擬處理業務 Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3))); Console.WriteLine($"線程:{Task.CurrentId} 消費完畢!剩余 {stock} 個"); //業務處理完后,釋放鎖. redisClient.Del(lockKey); }); }
運行結果:
看起來貌似沒毛病,實際上上述代碼有個致命的問題:
當某個線程拿到鎖之后,如果系統崩潰了,那么鎖永遠都不會被釋放.因此,我們應該給鎖加一個過期時間,當時間到了,還沒有被主動釋放,我們就讓redis釋放掉它,以保證其他消費者可以拿到鎖,進行消費.
這里給鎖加過期時間也有講究,不能拿到鎖后再加,比如:
......
//setnx : key不存在才會成功,存在則失敗. var success = redisClient.SetNx(lockKey, 1); if (success == true) { redisClient.Set(lockKey, 1, expireSeconds: 5); break; }
這樣操作的話,獲取鎖和設置鎖的過期時間就不是原子操作,同樣會出現上面提到的問題.Redis 提供了一個合而為一的操作可以解決這個問題.
//set : key存在則失敗,不存在才會成功,並且過期時間5秒 var success = redisClient.Set(lockKey, 1, expireSeconds: 5, exists: RedisExistence.Nx);
這個問題雖然解決了,但隨之產生了一個新的問題:
假設有3個線程A,B,C
當線程A拿到鎖后執行業務的時候超時了,超過了鎖的過期時間還沒執行完,這時候鎖被Redis釋放了,
於是線程B拿到了鎖並開始執行業務邏輯.
當線程B的業務邏輯還沒執行完的時候,線程A的業務邏輯執行完了,於是乎就跑去釋放掉了鎖.
這時候線程C就可以拿到鎖開始執行它的業務邏輯.
這不就亂套了么...
因此,線程在釋放鎖的時候應該判斷這個鎖還屬不屬於自己.
所以,在設置鎖的時候,redis的value值不能像上面代碼那樣,隨便給個1,而應該給一個隨機值,代表當前線程.
var id = Guid.NewGuid().ToString("N"); //獲取鎖 do { //set : key存在則失敗,不存在才會成功,並且過期時間5秒 var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx); if (success == true) { break; } Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖 } while (true); Console.WriteLine($"線程:{Task.CurrentId} 拿到了鎖,開始消費");
.........
//業務處理完后,釋放鎖. var value = redisClient.Get<string>(lockKey); if (value == id) { redisClient.Del(lockKey); }
完美了嗎?
不完美.還是老生常談的問題,取value和刪除key 分了兩步走,不是原子操作.
並且,這里還不能用pipe,因為需要根據取到的value來決定下一個操作.上面設置過期時間倒是可以用pipe.
所以,這里只能用lua.
2020.10.09 補:將庫存放到redis
完整的代碼如下:
CSRedisClient redisClient = new CSRedis.CSRedisClient("127.0.0.1:6379,defaultDatabase=0"); var lockKey = "lockKey"; var stockKey = "stock"; redisClient.Set(stockKey, 5);//商品庫存 var releaseLockScript = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";//釋放鎖的redis腳本 redisClient.Del(lockKey);//測試前,先把鎖刪了. Parallel.For(0, 10, i => { var id = Guid.NewGuid().ToString("N"); //獲取鎖 do { //set : key存在則失敗,不存在才會成功,並且過期時間5秒 var success = redisClient.Set(lockKey, id, expireSeconds: 5, exists: RedisExistence.Nx); if (success == true) { break; } Thread.Sleep(TimeSpan.FromSeconds(1));//休息1秒再嘗試獲取鎖 } while (true); Console.WriteLine($"線程:{Task.CurrentId} 拿到了鎖,開始消費"); //扣減庫存 var currentStock = redisClient.IncrBy(stockKey, -1); if (currentStock < 0) { Console.WriteLine($"庫存不足,線程:{Task.CurrentId} 搶購失敗!"); redisClient.Eval(releaseLockScript, lockKey, id); return; } //模擬處理業務,這里不考慮失敗的情況 Thread.Sleep(TimeSpan.FromSeconds(new Random().Next(1, 3))); Console.WriteLine($"線程:{Task.CurrentId} 消費完畢!剩余 {currentStock} 個"); //業務處理完后,釋放鎖. redisClient.Eval(releaseLockScript, lockKey, id); });
這篇文章只介紹了單節點Redis的分布式鎖,因為單節點,所以不是高可用.
多節點Redis則需要用官方介紹的RedLock,這玩意有點繞,我需要捋一捋.